package org.zjvis.datascience.service;

import static org.zjvis.datascience.common.constant.DatabaseConstant.DEFAULT_DATASET_ID;
import static org.zjvis.datascience.common.enums.ActionEnum.*;

import cn.hutool.core.util.ObjectUtil;
import cn.hutool.db.Entity;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.google.common.base.Joiner;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.sql.*;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import lombok.SneakyThrows;
import net.sourceforge.pinyin4j.format.exception.BadHanyuPinyinOutputFormatCombination;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.zjvis.datascience.common.constant.Constant;
import org.zjvis.datascience.common.constant.DatasetConstant;
import org.zjvis.datascience.common.constant.SemanticConstant;
import org.zjvis.datascience.common.constant.SqlTemplate;
import org.zjvis.datascience.common.dto.TaskDTO;
import org.zjvis.datascience.common.dto.TaskInstanceDTO;
import org.zjvis.datascience.common.model.Column;
import org.zjvis.datascience.common.util.task.TaskInstanceDTOUtil;
import org.zjvis.datascience.common.util.task.TaskUtil;
import org.zjvis.datascience.common.widget.dto.WidgetDTO;
import org.zjvis.datascience.common.enums.ActionEnum;
import org.zjvis.datascience.common.enums.AlgPyEnum;
import org.zjvis.datascience.common.enums.DataTypeEnum;
import org.zjvis.datascience.common.enums.ETLEnum;
import org.zjvis.datascience.common.enums.PythonDateTypeFormatEnum;
import org.zjvis.datascience.common.enums.TaskInstanceStatus;
import org.zjvis.datascience.common.enums.TaskTypeEnum;
import org.zjvis.datascience.common.exception.BaseErrorCode;
import org.zjvis.datascience.common.exception.DataScienceException;
import org.zjvis.datascience.common.model.ApiResultCode;
import org.zjvis.datascience.common.model.ConfigComponent;
import org.zjvis.datascience.common.model.Table;
import org.zjvis.datascience.common.model.stat.ColumnAction;
import org.zjvis.datascience.common.model.stat.ColumnConstant;
import org.zjvis.datascience.common.model.stat.ColumnCounter;
import org.zjvis.datascience.common.model.stat.ColumnRange;
import org.zjvis.datascience.common.model.stat.ColumnStat;
import org.zjvis.datascience.common.model.stat.Range;
import org.zjvis.datascience.common.sql.DataCleanSqlHelper;
import org.zjvis.datascience.common.sql.SqlHelper;
import org.zjvis.datascience.common.sql.SqlSyntaxHelper;
import org.zjvis.datascience.common.sql.TransFormSqlHelper;
import org.zjvis.datascience.common.util.*;
import org.zjvis.datascience.common.util.db.JDBCUtil;
import org.zjvis.datascience.common.vo.PercentageDataVO;
import org.zjvis.datascience.common.vo.TaskInstanceVO;
import org.zjvis.datascience.common.vo.column.ColumnQueryVO;
import org.zjvis.datascience.common.vo.column.MultiColumnQueryVO;
import org.zjvis.datascience.common.vo.dataset.HeadVO;
import org.zjvis.datascience.service.dag.TaskFuture;
import org.zjvis.datascience.service.dag.TaskManager;
import org.zjvis.datascience.service.dag.TaskRunnerResult;
//import org.zjvis.datascience.service.dataprovider.GPDataProvider;
import org.zjvis.datascience.service.dataprovider.GPDataProvider;
import org.zjvis.datascience.service.mapper.TaskInstanceMapper;
import org.zjvis.datascience.service.mapper.TaskMapper;
import org.zjvis.datascience.service.mapper.WidgetMapper;

import javax.servlet.ServletContext;

/**
 * @description TColumn 数据字段结果渲染服务 Service
 * @date 2021-11-29
 */
@Service
@SuppressWarnings("all")
public class TColumnService {

    private final static Logger logger = LoggerFactory.getLogger("TColumnService");

    @Autowired
    private GPDataProvider gpDataProvider;

    @Autowired
    private TaskInstanceMapper taskInstanceMapper;

    @Autowired
    private SemanticService semanticService;

    @Autowired
    private TaskMapper taskMapper;

    @Autowired
    private TaskInstanceService taskInstanceService;

    @Lazy
    @Autowired
    private TaskService taskService;

    @Autowired
    private WidgetMapper widgetMapper;

    @Autowired
    private ActionService actionService;

    @Autowired
    private UrbanDataService urbanDataService;

    @Autowired
    private RedisUtil redisUtil;

    @Autowired
    private DateService dateService;

    @Autowired
    private ServletContext servletContext;

    private final Cache<Long, String> cache = CacheBuilder.newBuilder()
        .expireAfterWrite(2, TimeUnit.SECONDS).build();

    private static final String DETAIL_SQL = "select \"page\",sum(\"num\") as \"sum\" from \n"
        + "(select \"num\",((row_number() over(order by \"%s\" %s))-1)/%d as page from \n"
        + "(select \"%s\",count(*) as num from %s %s group by \"%s\") a) b \n"
        + "group by \"page\" order by \"page\"";

    private static final String DETAIL_FILTER_SQL =
        "select \"page\",sum(\"num\") as \"sum\",sum(\"filter\") as \"filter\" from \n"
            + "(select \"filter\",\"num\",((row_number() over(order by \"%s\" %s))-1)/%d as page from \n"
            + "(select \"%s\",count(*) as num,count(CASE WHEN %s THEN 1 END) as \"filter\" from %s %s group by \"%s\") a) b \n"
            + "group by \"page\" order by \"page\"";

    private static final String QUERY_TOTAL_SQL = "select count(1) from %s %s";
    private static final String QUERY_NULL_SQL = "select count(1) %s from %s where \"%s\" is null";
    private static final String QUERY_NULL_CHINESE_SQL = "select count(1) %s from %s where \"%s\"::NUMERIC is null";
    private static final String SAMPLE_TABLE_SQL = "(select * from %s limit 1000000) sample_t";
    private static final String SAMPLE_TABLE_LIMIT = "(select * from %s limit 1000000) sample_t";
    private static final String VISITOR_TABLE_LIMIT = "(select * from %s limit %s) sample_t";
    private static final String DELETE_REDUNDANT_DATA_SQL = "delete from %s \n"
        + "where gp_segment_id::varchar(100)||ctid::varchar(100) in \n"
        + "(select t.ctid from (select  gp_segment_id::varchar(100)||ctid::varchar(100) as ctid, row_number() over (partition by \"_record_id_\") rows_index\n"
        + "from %s ) t \n"
        + "where t.rows_index >= 2);";

    private static final Integer LIMIT_RECORDS = 2000000;

    private final static Long CACHE_EXPIRE_TIME = 1 * 60 * 60L; // 1 hour

    public String getTableInModelTask(TaskDTO taskDTO) {
        try {
            String dataJson = taskDTO.getDataJson();
            JSONObject data = JSONObject.parseObject(dataJson);
            JSONArray outputarray = data.getJSONArray("output");
            JSONObject output = outputarray.getJSONObject(0);
            String otableName = output.getString("tableName");
            if (!otableName.contains(".")) {
                otableName = "dataset." + otableName;
            }
            return otableName;
        } catch (Exception e) {
            logger.info("unable to set table name");
        }
        return "";
    }

    public ColumnStat querySimpleStat(ColumnQueryVO vo, Boolean needPageInfo, Boolean needRangeInfo) {
        return queryStat(vo, needPageInfo, needRangeInfo);
    }

    public ColumnStat queryStat(ColumnQueryVO vo) {
        return queryStat(vo, true, true);
    }

    public ColumnStat queryStat(ColumnQueryVO vo, Boolean pageCountFlag, Boolean rangeCountFlag) {
        int type = vo.getType();
        TaskDTO taskDTO = taskMapper.queryById(vo.getTaskId());
        String name = vo.getName();
        String highLightType = "highLight";

        if ((vo.getFilter() == null || vo.getFilter().size() == 0) &&
                (null != taskDTO && taskDTO.getType().equals(TaskTypeEnum.TASK_TYPE_ALGOPY.getVal()))) {
            JSONObject dataJson = JSONObject.parseObject(taskDTO.getDataJson());
            int algPyType = dataJson.getInteger("algType");
            JSONObject data = dataJson.getJSONObject("highlight");
            JSONArray values = new JSONArray();
            if (data != null && data.getJSONArray(name) != null) {
                values = data.getJSONArray(name);
            }
            if (algPyType == AlgPyEnum.ANOMALY_STAT.getVal() || algPyType == AlgPyEnum.ANOMALY_KNN.getVal()) {
                highLightType = "anomaly";
            } else if (algPyType == AlgPyEnum.IMPUTATION_STAT.getVal() || algPyType == AlgPyEnum.IMPUTATION_MULTI.getVal()) {
                highLightType = "imputation";
            }
            if (values.size() > 0) {
                JSONObject newFilter = new JSONObject();
                newFilter.put("col", vo.getName());
                newFilter.put("values", values);
                newFilter.put("filterType", "=");
                JSONArray tmp = new JSONArray();
                tmp.add(newFilter);
                JSONArray newFilters = new JSONArray();
                newFilters.add(tmp);
                vo.setFilter(newFilters);
            }
        }

        if (null != taskDTO && taskDTO.getType().equals(TaskTypeEnum.TASK_TYPE_MODEL.getVal())) {
            vo.setTable(getModelTaskOutputTable(taskDTO));
        }
        //校验表名
        String alignTable = ToolUtil.alignTableName(vo.getTable(), 0L);
        Map<String, Integer> cols = gpDataProvider
            .getColumnTypesOriginal(new Table(DEFAULT_DATASET_ID, alignTable));

        String sampleTable;
        sampleTable = String.format(SAMPLE_TABLE_LIMIT, alignTable);
        if (vo.getNeedLimit()){ //针对访客 截取部分数据 [queryStat]
            Long totalRowNum = gpDataProvider.getTotalRowNum(new Table(DEFAULT_DATASET_ID, alignTable));
            sampleTable = String.format(VISITOR_TABLE_LIMIT, alignTable, Math.min(10, totalRowNum/2));
        }
        vo.setTable(sampleTable);
        vo.setName(vo.getName().replaceAll("\"", "\"\""));
        String filter = filter(vo, false);
        String searchFilter = filter(vo, true);
        ColumnStat stat = new ColumnStat(vo.getTable(), vo.getName(), vo.getType());
        JSONObject data = vo.getData();
        JSONObject basic;
        //basic不变的查询，由前端传入basic
        if (CollectionUtil.isNotEmpty(data) && null != data.getJSONObject("topColumn")) {
            JSONObject topColumn = data.getJSONObject("topColumn");
            data.put("topColumnCount", topColumn.getInteger("count"));
            data.put("topColumn", topColumn.getObject("column", Object.class));
            basic = data;
        } else {
            basic = getBasicStat(vo, "", searchFilter);
        }
        if (basic.getIntValue("total") == 0) {
            return null;
        }
        List<Range> ranges = null;
        int total = basic.getIntValue("total");
        int distinctTotal = basic.getIntValue("distinctTotal");
        stat.setTotal(total);
        stat.setDistinctTotal(distinctTotal);

        int mode = vo.getMode();
        try {
            if (type == Types.DECIMAL || type == Types.NUMERIC) {
                stat.setMin(basic.getBigDecimal("min"));
                stat.setMax(basic.getBigDecimal("max"));
                stat.setTopColumn(
                        new ColumnCounter(basic.getBigDecimal("topColumn"),
                                basic.getInteger("topColumnCount")));
                if (mode == ColumnConstant.MODE_ABSTRACT) {
                    ranges = NumericUtil
                            .divisionBigDecimal(basic.getBigDecimal("min"), basic.getBigDecimal("max"),
                                    ColumnConstant.STAT_ABSTRACT_SEGMENT);
                }
            } else if (type == Types.FLOAT || type == Types.DOUBLE) {
                stat.setMin(basic.getDouble("min"));
                stat.setMax(basic.getDouble("max"));
                stat.setTopColumn(
                        new ColumnCounter(basic.getDouble("topColumn"),
                                basic.getInteger("topColumnCount")));
                if (mode == ColumnConstant.MODE_ABSTRACT) {
                    ranges = NumericUtil.divisionDouble(basic.getDouble("min"), basic.getDouble("max"),
                            ColumnConstant.STAT_ABSTRACT_SEGMENT);
                }
            } else if (type == Types.BIGINT) {
                stat.setMin(basic.getLong("min"));
                stat.setMax(basic.getLong("max"));
                stat.setTopColumn(
                        new ColumnCounter(basic.getLong("topColumn"),
                                basic.getInteger("topColumnCount")));
                if (mode == ColumnConstant.MODE_ABSTRACT) {
                    ranges = NumericUtil.divisionLong(basic.getLong("min"), basic.getLong("max"),
                            ColumnConstant.STAT_ABSTRACT_SEGMENT);
                }
            } else if (type == Types.SMALLINT || type == Types.INTEGER) {
                stat.setMin(basic.getInteger("min"));
                stat.setMax(basic.getInteger("max"));
                stat.setTopColumn(
                        new ColumnCounter(basic.getInteger("topColumn"),
                                basic.getInteger("topColumnCount")));
                if (mode == ColumnConstant.MODE_ABSTRACT) {
                    ranges = NumericUtil
                            .divisionInteger(basic.getInteger("min"), basic.getInteger("max"),
                                    ColumnConstant.STAT_ABSTRACT_SEGMENT);
                }
            } else if (SqlSyntaxHelper.isDate(type)) {
                stat.setMin(basic.getString("min"));
                stat.setMax(basic.getString("max"));
                stat.setTopColumn(
                    new ColumnCounter(basic.getString("topColumn"),
                        basic.getInteger("topColumnCount")));
                if (mode == ColumnConstant.MODE_ABSTRACT) {
                    ranges = NumericUtil.divisionLong(basic.getTimestamp("min").getTime(),
                            basic.getTimestamp("max").getTime(),
                            ColumnConstant.STAT_ABSTRACT_SEGMENT);
                }
            } else {//除了数值都存成字符串
                stat.setMin(basic.getString("min"));
                stat.setMax(basic.getString("max"));
                stat.setTopColumn(
                        new ColumnCounter(basic.getString("topColumn"),
                                basic.getInteger("topColumnCount")));
            }
        }catch (Exception e){
            //出现异常，建议直接设置对应的类型为string
            stat.setMin(basic.getString("min"));
            stat.setMax(basic.getString("max"));
            stat.setTopColumn(
                    new ColumnCounter(basic.getString("topColumn"),
                            basic.getInteger("topColumnCount")));
        }
        if (rangeCountFlag) {
            List<ColumnRange> rangeCount = null;
            if (mode == ColumnConstant.MODE_DETAIL || null == ranges) {
                //详细模式右侧概览数据获取
                rangeCount = getDetailRangeCount(vo, filter, searchFilter, distinctTotal, highLightType);
            } else if (mode == ColumnConstant.MODE_ABSTRACT) {
                //摘要模式获取摘要的计数值
                rangeCount = getRangeCount(vo, ranges, filter, highLightType);
            }
            stat.setRangeCount(rangeCount);
        }
        if (pageCountFlag && (mode == ColumnConstant.MODE_DETAIL || null == ranges)) {
            //详细模式的分页数据获取
            Page<ColumnCounter> pageCount = getPageCount(vo, stat, filter, searchFilter, alignTable, highLightType);

            if (CollectionUtil.isNotEmpty(data) &&
                    StringUtils.isNotEmpty(data.getString("semantic")) &&
                    CollectionUtil.isNotEmpty(pageCount.getData())) {
                List<ColumnCounter> dataList = pageCount.getData();
                for (ColumnCounter cc : dataList) {
                    cc.setIsMatching(urbanDataService
                            .ifMatching(String.valueOf(cc.getColumn()),
                                    data.getString("semantic")));
                }
            }
            stat.setPageCount(pageCount);
        }
        queryNullCounter(vo, stat, filter, searchFilter);
        return stat;
    }

    private void queryNullCounter(ColumnQueryVO vo, ColumnStat stat, String filter,
        String searchFilter) {
        String defaultDataSourceKey = servletContext.getAttribute(Constant.DEFAULT_DATA_SOURCE_KEY).toString();
        if (StringUtils.isNotEmpty(searchFilter) || new Integer(0).equals(stat.getTotal())) {
            return;
        }
        ColumnCounter nullCounter = new ColumnCounter(null, 0);
        String sql;
        if (StringUtils.isNotEmpty(filter)) {
            sql = String.format(QUERY_NULL_SQL,
                String.format(",count(case when %s then 1 end)", filter.replace("WHERE", "")),
                stat.getTable(), stat.getName());
        } else {
            if (StringUtil.hasChinese(vo.getName())) {
                if (SqlSyntaxHelper.isNumber(stat.getType())) {
                    sql = String
                        .format(QUERY_NULL_CHINESE_SQL, "", stat.getTable(), stat.getName());
                } else {
                    sql = String.format(QUERY_NULL_SQL, "", stat.getTable(), stat.getName());
                }
            } else {
                sql = String.format(QUERY_NULL_SQL, "", stat.getTable(), stat.getName());
            }

        }
        Connection conn = null;
        try {
            conn = gpDataProvider.getConn(DEFAULT_DATASET_ID);
            Statement st = conn.createStatement();
            ResultSet rs = st.executeQuery(sql);
            if (rs.next()) {
                nullCounter.setCount(rs.getInt(1));
                if (StringUtils.isNotEmpty(filter)) {
                    nullCounter.setHighLight(rs.getInt(2));
                }
            }
        } catch (Exception e) {
            logger.error("TColumnService.queryNullCounter error, msg={}, sql={}", e.getMessage(),
                sql);
        }
        finally {
            JDBCUtil.close(conn, null, null);
        }
        if (nullCounter.getCount() != 0) {
            if (CollectionUtil.isEmpty(vo.getData())) {
                stat.setDistinctTotal(stat.getDistinctTotal() + 1);
            }
            ColumnCounter topColumn = stat.getTopColumn();
            Page<ColumnCounter> pageCount = stat.getPageCount();
            List<ColumnRange> rangeCount = stat.getRangeCount();
            if (null != topColumn && topColumn.getCount() < nullCounter.getCount()) {
                stat.setTopColumn(nullCounter);
            }
            if (null != pageCount && pageCount.getCurPage() == 1) {
                pageCount.getData().add(0, nullCounter);
            }
            if (null != rangeCount) {
                rangeCount.add(0,
                    new ColumnRange(null, null, nullCounter.getCount(),
                        nullCounter.getHighLight(), "highLight"));
            }
        }
    }

    /**
     * 详细模式右侧概览数据获取
     */
    private List<ColumnRange> getDetailRangeCount(ColumnQueryVO vo, String filter,
        String searchFilter, Integer distinctTotal, String highLightType) {
        List<ColumnRange> rangeCount = Lists.newArrayList();
        Integer step = distinctTotal / ColumnConstant.STAT_DETAIL_SEGMENT + 1;
        String col = vo.getName();
        String orderBy = null;
        //排序方式处理
        if (ColumnConstant.COUNT_SORT_TYPE.equals(vo.getSortType())) {
            orderBy = "num";
        } else if (ColumnConstant.DICTIONARY_SORT_TYPE.equals(vo.getSortType())) {
            orderBy = col;
        }
//        这里查询null值顺序不对,后面单独添加
        if (StringUtils.isEmpty(searchFilter)) {
            searchFilter = "WHERE \'" + col + "\' IS NOT NULL";
        }
        Connection conn = null;
        try {
            conn = gpDataProvider.getConn(DEFAULT_DATASET_ID);
            Statement st = conn.createStatement();
            String sql;
            ResultSet rs;
            //没有高亮的查询
            if (StringUtils.isEmpty(filter)) {
                sql = String.format(DETAIL_SQL, orderBy, vo.getSortVal(), step, col,
                    vo.getTable(), searchFilter, col);
                rs = st.executeQuery(sql);
                while (rs.next()) {
                    int low = rs.getInt(1) * step;
                    int upper = low + step;
                    rangeCount.add(new ColumnRange(low, upper, rs.getInt(2)));
                }
            } else {
                //高亮的查询
                sql = String.format(DETAIL_FILTER_SQL, orderBy, vo.getSortVal(), step, col,
                    filter.replace("WHERE", ""), vo.getTable(), searchFilter, col);
                rs = st.executeQuery(sql);
                while (rs.next()) {
                    int low = rs.getInt(1) * step;
                    int upper = low + step;
                    rangeCount.add(new ColumnRange(low, upper, rs.getInt(2), rs.getInt(3), highLightType));
                }
            }
            //按统计值排序的话。由于最后一个分段数量不足，去掉
            if (ColumnConstant.COUNT_SORT_TYPE.equals(vo.getSortType())
                && distinctTotal >= ColumnConstant.STAT_DETAIL_SEGMENT) {
                rangeCount.remove(rangeCount.size() - 1);
            }
        } catch (Exception e) {
            logger.error("TColumnService.getDetailRangeCount error, msg={}", e.getMessage());
        }
        finally {
            JDBCUtil.close(conn, null, null);
        }
        return rangeCount;
    }


    public JSONObject getBasicStat(ColumnQueryVO vo, String filter, String searchFilter) {
//        String defaultDataSourceKey = servletContext.getAttribute(Constant.DEFAULT_DATA_SOURCE_KEY).toString();
        String table = vo.getTable();
        String column = vo.getName();
        String formatCol = "";
        String key = StringUtil.generateHashKey(table, column, filter, searchFilter);
        Object cache = redisUtil.get("getBasicStat:" + key);
        if (null != cache) {
            logger.info("getBasicStat() using cache... ");
            return (JSONObject) cache;
        }else {
            logger.info("getBasicStat() not using cache...");
        }
        Integer total = null;
        Integer distinctTotal = null;
        Object max = null;
        Object min = null;
        Object topColumn = null;
        Integer topColumnCount = null;

        if (StringUtils.isNotEmpty(filter) && StringUtils.isNotEmpty(searchFilter)) {
            searchFilter = searchFilter.replace("WHERE", "");
        }
        Connection conn = null;
        if (vo.getType() == Types.DATE) {
            column = dateService.formatTime(vo.getTable(), vo.getName());
        } else {
            column = SqlUtil.formatPGSqlColName(column);
        }
        String sql = String
                .format("SELECT COUNT(1), MAX(%s), MIN(%s) FROM %s %s %s", column,
                        column, table, filter, searchFilter);
        try {
            conn = gpDataProvider.getConn(DEFAULT_DATASET_ID);
            Statement st = conn.createStatement();
            ResultSet rs = st.executeQuery(sql);
            if (rs.next()) {
                total = rs.getInt(1);
                max = rs.getObject(2);
                min = rs.getObject(3);
            }
            sql = String.format("select COUNT(*) from (select DISTINCT %s from %s %s %s) tmp", column, table, filter,
                    searchFilter);
            rs = st.executeQuery(sql);
            if (rs.next()) {
                distinctTotal = rs.getInt(1);
            }

            sql = String.format(
                "SELECT %s, COUNT(%s) FROM %s %s %s GROUP BY %s ORDER BY COUNT(%s) DESC LIMIT 1",
                column, column, table, filter, searchFilter, column, column);

            rs = st.executeQuery(sql);

            if (rs.next()) {
                topColumn = rs.getObject(1);
                topColumnCount = rs.getInt(2);
            }
        } catch (Exception e) {
            logger.error("TColumnService getBasicStat error,column={}", table + "." + column);
        }finally {
            if (conn != null) {
                try {
                    conn.close();
                } catch (Exception e) {

                }
            }
        }

        JSONObject ret = new JSONObject();
        ret.put("total", total);
        ret.put("distinctTotal", distinctTotal);
        ret.put("min", min);
        ret.put("max", max);
        ret.put("topColumn", topColumn);
        ret.put("topColumnCount", topColumnCount);
        if (total != 0) {
            redisUtil.set("getBasicStat:" + key, ret, 3600);
        }
        return ret;
    }

    /**
     * 摘要模式获取摘要的计数值
     */
    public List<ColumnRange> getRangeCount(ColumnQueryVO vo, List<Range> ranges, String filter, String highLightType) {
        List<ColumnRange> rangeCount = Lists.newArrayList();
        String caseWhenTemplate = "COUNT(CASE WHEN \"%s\" >= %s AND \"%s\" < %s THEN 1 END)";
        List<String> caseWhenList = Lists.newArrayList();
        int type = vo.getType();
        for (Range range : ranges) {
            String caseWhen;
            if (SqlUtil.isNumeric(type)) {
                caseWhen = String
                    .format(caseWhenTemplate, vo.getName(), range.lowerEndpoint(), vo.getName(),
                        range.upperEndpoint());
            } else {
                String lower = (String) range.lowerEndpoint();
                String upper = (String) range.upperEndpoint();
                //字符串增加'转义
                caseWhen = String
                    .format(caseWhenTemplate, vo.getName(),
                        "'" + lower.replaceAll("'", "''") + "'",
                        vo.getName(), "'" + upper.replaceAll("'", "''") + "'");
            }
            caseWhenList.add(caseWhen);
        }
        //cache
        String sql = String
            .format("SELECT %s FROM %s", Joiner.on(",").join(caseWhenList), vo.getTable());
        String sqlWithFilter = sql;
        if (StringUtils.isNotEmpty(filter)) {
            sqlWithFilter = sql + " " + filter;
        }

        Connection conn = null;
        Statement st = null;
        ResultSet rs = null;
        try {
            conn = gpDataProvider.getConn(DEFAULT_DATASET_ID);
            st = conn.createStatement();
            String key = StringUtil.generateHashKey(sql);
            Object cache = redisUtil.get("getRangeCount:" + key);
            if (null != cache) {
                List<ColumnRange> columnRanges = JSONArray.parseArray(cache.toString()).toJavaList(ColumnRange.class);
                rangeCount = columnRanges;
            }else {
                rs = st.executeQuery(sql);
                if (rs.next()) {
                    for (int i = 0; i < ranges.size(); i++) {
                        rangeCount.add(
                                new ColumnRange(ranges.get(i).lowerEndpoint(),
                                        ranges.get(i).upperEndpoint(),
                                        rs.getInt(i + 1)));
                    }
                    if (rangeCount.size() > 0) {
                        redisUtil.set("getRangeCount:" + key, JSONArray.toJSONString(rangeCount), 3600);
                    }
                }
            }

            if (StringUtils.isNotEmpty(filter)) {
                String key2 = StringUtil.generateHashKey(sqlWithFilter);
                Object cache2 = redisUtil.get("getRangeCount:" + key2);
                if (null != cache2) {
                    List<ColumnRange> columnRanges = JSONArray.parseArray(cache2.toString()).toJavaList(ColumnRange.class);
                    rangeCount = columnRanges;
                }else {
                    rs = st.executeQuery(sqlWithFilter);
                    if (rs.next()) {
                        for (int i = 0; i < ranges.size(); i++) {
                            if (highLightType.equals("highLight")) {
                                rangeCount.get(i).setHighLight(rs.getInt(i + 1));
                            } else if (highLightType.equals("anomaly")) {
                                rangeCount.get(i).setAnomaly(rs.getInt(i + 1));
                            } else if (highLightType.equals("imputation")) {
                                rangeCount.get(i).setImputation(rs.getInt(i + 1));
                            }
                        }
                    }
                    if (rangeCount.size() > 0) {
                        redisUtil.set("getRangeCount:" + key2, JSONArray.toJSONString(rangeCount), 3600);
                    }
                }
            }

        } catch (Exception e) {
            logger.error("TColumnService getRangeCount error,column={}",
                vo.getTable() + "." + vo.getName());
        } finally {
            if (conn != null) {
                try {
                    conn.close();
                } catch (Exception e) {
                }
            }
        }
        return rangeCount;
    }

    /**
     * 详细模式的分页数据获取
     */
    public Page<ColumnCounter> getPageCount(ColumnQueryVO vo, ColumnStat stat, String filter,
        String searchFilter, String alignTable, String highLightType) {
        String defaultDataSourceKey = servletContext.getAttribute(Constant.DEFAULT_DATA_SOURCE_KEY).toString();
        Page<ColumnCounter> page = new Page<>();
        int curPage = vo.getCurPage();
        int pageSize = vo.getPageSize();

        int offset = (vo.getCurPage() - 1) * vo.getPageSize();

        page.setPageSize(pageSize);
        page.setCurPage(curPage);
        page.setTotalElementsAndPage(stat.getDistinctTotal());

        List<ColumnCounter> data = Lists.newArrayList();
        //这里查询null值顺序不对,后面单独添加
        if (StringUtils.isEmpty(searchFilter)) {
            if (StringUtil.hasChinese(vo.getName())) {
                searchFilter = "WHERE \"" + vo.getName() + "\"::TEXT IS NOT NULL";
            } else {
                searchFilter = "WHERE \"" + vo.getName() + "\" IS NOT NULL";
            }
        }
        Connection conn = null;
        String sql = "";
        try {
            if (vo.getType() == Types.DATE){
                sql = "SELECT \"%s\"::varchar, COUNT(\"%s\") FROM %s %s GROUP BY \"%s\" ORDER BY %s %s LIMIT %s OFFSET %s";
            } else {
                sql = "SELECT \"%s\", COUNT(\"%s\") FROM %s %s GROUP BY \"%s\" ORDER BY %s %s LIMIT %s OFFSET %s";
            }
//            conn = gpDataProvider.getConn(DEFAULT_DATASET_ID);
//            Statement st = conn.createStatement();

            if (vo.getSortType() == ColumnConstant.DICTIONARY_SORT_TYPE) {
                String col = vo.getName();
                String orderCol = SqlUtil.formatPGSqlColName(col);
                if (vo.getType() == Types.DATE) {
                    orderCol = dateService.formatTime(vo.getTable(), col);
                }
                sql = String
                        .format(sql, col, col, vo.getTable(), searchFilter,
                                col, orderCol, vo.getSortVal(), pageSize, offset);
            } else if (vo.getSortType() == ColumnConstant.COUNT_SORT_TYPE) {
                sql = String
                    .format(sql, vo.getName(), vo.getName(), vo.getTable(), searchFilter,
                        vo.getName(),
                        "COUNT(\"" + vo.getName() + "\")", vo.getSortVal(), pageSize,
                        offset);
            }
            JSONArray jsonArray = gpDataProvider.executeQuerySQL(defaultDataSourceKey, sql, JSONArray.class);

            List<LinkedHashMap> jsonObjects = jsonArray.toJavaList(LinkedHashMap.class);
//            ResultSet rs = st.executeQuery(sql);
            Integer type = vo.getType();

            for (LinkedHashMap item: jsonObjects){
                Object[] objects = item.values().toArray();
                ColumnCounter cc = new ColumnCounter((Comparable) objects[0], Integer.valueOf(objects[1].toString()));
                data.add(cc);
            }
//            while (rs.next()) {
//                ColumnCounter cc = null;
//                cc = new ColumnCounter((Comparable) rs.getObject(1), rs.getInt(2));
//                data.add(cc);
//            }

            if (StringUtils.isNotEmpty(filter)) {
                List<Object> list = Lists.newArrayList();
                boolean isNumber = SqlSyntaxHelper.isNumber(type);
                for (ColumnCounter item : data) {
                    if (isNumber) {
                        list.add(item.getColumn().toString());
                    } else {
                        list.add("'" + item.getColumn().toString().replaceAll("'", "''") + "'");
                    }
                }
                String filterSql = Joiner.on(",").join(list);
                sql = "SELECT \"%s\", COUNT(\"%s\") FROM %s %s AND \"%s\" IN (%s) GROUP BY \"%s\"";
                sql = String
                        .format(sql, vo.getName(), vo.getName(), vo.getTable(), filter,
                                vo.getName(), filterSql,
                                vo.getName());

                List<LinkedHashMap> jsonObjectList = gpDataProvider.executeQuerySQL(defaultDataSourceKey, sql, JSONArray.class).toJavaList(LinkedHashMap.class);
                Map<Object, Integer> map = Maps.newLinkedHashMap();
                for (LinkedHashMap item: jsonObjectList){
                    Object[] objects = item.values().toArray();
                    map.put(objects[0], Integer.valueOf(objects[1].toString()));
                }

//                while (rs.next()) {
//                    map.put(rs.getObject(1), rs.getInt(2));
//                }
                for (ColumnCounter item : data) {
                    Integer highLight = map.get(item.getColumn());
                    if (highLightType.equals("highLight")) {
                        item.setHighLight(highLight == null ? 0 : highLight);
                    } else if (highLightType.equals("anomaly")) {
                        item.setAnomaly(highLight == null ? 0 : highLight);
                    } else if (highLightType.equals("imputation")) {
                        item.setImputation(highLight == null ? 0 : highLight);
                    }
                }
            }

        } catch (Exception e) {
            logger.error("TColumnService getPageCount error,column={}, errMsg={}, sql={}",
                vo.getTable() + "." + vo.getName(), e.getMessage(), sql);
        }
        finally {
            logger.info("getPageCount SQL -> {}", sql);
            if (conn != null) {
                try {
                    conn.close();
                } catch (Exception e) {
                }
            }
        }

        try {
            JSONObject info = new JSONObject();
            TaskInstanceDTO taskInstance = taskInstanceService.queryInstanceByTableName(alignTable, vo.getTaskId());
            if (taskInstance == null){
                TaskDTO taskDTO = taskService.queryById(vo.getTaskId());
                info = JSONObject.parseObject(taskDTO.getDataJson()).getJSONArray("output")
                    .getJSONObject(0);
            } else {
                info = JSONObject.parseObject(taskInstance.getDataJson())
                    .getJSONObject("inputInfo").getJSONArray("output")
                    .getJSONObject(0);
            }
            if (vo.getSortType() == ColumnConstant.DICTIONARY_SORT_TYPE) {
                List<ColumnCounter> dataOrder = Lists.newArrayList();
                String sortOrder = vo.getSortVal();
                JSONArray order = new JSONArray();
                JSONArray categoryOrder = new JSONArray();
                if (info.getJSONObject("categoryOrder") != null
                        && info.getJSONObject("categoryOrder").getJSONArray(vo.getName()) != null) {
                    categoryOrder = info.getJSONObject("categoryOrder")
                        .getJSONArray(vo.getName());
                }
                if (categoryOrder.size() > 0) {
                    if (sortOrder.equals("ASC")) {
                        order = categoryOrder;
                    } else {
                        for (int i = categoryOrder.size() - 1; i >= 0; i--) {
                            order.add(categoryOrder.get(i));
                        }
                    }
                    for (Object col : order) {
                        for (ColumnCounter d : data) {
                            if (d.getColumn().toString().equals(col)) {
                                dataOrder.add(d);
                            }
                        }
                    }
                    data = dataOrder;
                }
            }
        } catch (Exception e){
            logger.warn("TColumnService getPageCount error, when ordering column, since {}", e.getMessage());
        }

        page.setData(data);
        return page;
    }

    public JSONObject queryDetail_kg(ColumnQueryVO vo, String sqlTBName) {
        String defaultDataSourceKey = servletContext.getAttribute(Constant.DEFAULT_DATA_SOURCE_KEY).toString();
        TaskDTO taskDTO = null;
        String sqlOrderBy = StringUtils.EMPTY;
        if (ObjectUtil.isNotNull(vo.getTaskId())){
            taskDTO = taskMapper.queryById(vo.getTaskId());
        }
        if (ObjectUtil.isNotNull(taskDTO) && (taskDTO.getType().equals(TaskTypeEnum.TASK_TYPE_ETL.getVal()))) {
            try {
                JSONObject data = JSONObject.parseObject(taskDTO.getDataJson());
                if (data.containsKey("algType")&&data.containsKey("sql")) {
                    //能进来的都是ETL->SQL 为啥要这个 看能不能去掉
                    String etlType = data.getString("algType");
                    if (ETLEnum.SQL.getVal().equals(Integer.valueOf(etlType))) {
                        String sqlStr = data.getString("sql").replaceAll("\\s+"," ");
                        Pattern pattern = Pattern.compile(" order by \\S+ [desc|asc]*");
                        Matcher matcher = pattern.matcher(sqlStr.toLowerCase());
                        if (matcher.find()) {
                            sqlOrderBy = matcher.group(0);
                        }
                    }
                }
            } catch (Exception e) {
                logger.info("unable to set table name");
            }
        } else {
            if (ObjectUtil.isNotNull(taskDTO) && (null == vo.getTable() || vo.getTable().isEmpty())){
                vo.setTable(TaskUtil.extractTableStr(taskDTO));
            }else {
                vo.setTable(ToolUtil.alignTableName(vo.getTable(), 0L));
            }
        }

        String name = vo.getName();
        String sortVal = vo.getSortVal();
        String filter = filter(vo, false);
        Page<JSONArray> page = new Page<>();
        int curPage = vo.getCurPage();
        int pageSize = vo.getPageSize();

        String tableName = vo.getTable();

        int offset = (vo.getCurPage() - 1) * vo.getPageSize();

        page.setPageSize(pageSize);
        page.setCurPage(curPage);

        String orderBy = "";
        if (StringUtils.isNotBlank(sqlOrderBy)) {
            orderBy = sqlOrderBy;
        } else if (StringUtils.isNotEmpty(name) && StringUtils.isNotEmpty(sortVal)) {
            orderBy = SqlUtil.formatPGSqlColName(name) + " " + sortVal;
        } else {
            orderBy = SqlUtil.formatPGSqlColName(DataCleanSqlHelper.IDCOLUMN_NAME) + " ASC";
        }
        JSONObject result = new JSONObject();
        Connection conn = null;
        Integer recordsAmount = 0;
        try {
            conn = gpDataProvider.getConn(DEFAULT_DATASET_ID);
            Statement st = conn.createStatement();
            //查询数据的总条数

            String sql = String.format(QUERY_TOTAL_SQL, vo.getTable(), filter);
            ResultSet rs = st.executeQuery(sql);
//            recordsAmount = gpDataService.executeQuerySQL(defaultDataSourceKey, sql, Integer.class).getResult();
            if (rs.next()) {
                recordsAmount = rs.getInt(1);
                page.setTotalElementsAndPage(recordsAmount);
            }

            // TSNE flag (TSNE算法 缺少 _record_id_列，需要增加)
//            Map<String, Integer> currDataColMap = gpDataProvider
//                .getColumnTypesOriginal(new Table(DEFAULT_DATASET_ID, vo.getTable()));

//            DataPattern dataPattern = new org.zjvis.datacenter.service.vo.table.Table(defaultDataSourceKey, vo.getTable().split("\\.")[0], vo.getTable().split("\\.")[1]);
//            List<Column> columnsInfo = gpDataService.queryTableMeta(defaultDataSourceKey, "select * from " + vo.getTable()).getResult();
////            Map<String, Integer> currDataColMap = columnsInfo.stream().collect(Collectors.toMap(Column::getLabel, Column::getType));
//            Map<String, Integer> currDataColMap = Maps.newLinkedHashMap();
//            for (Column col: columnsInfo){
////                System.out.println("col.getLabel(), col.getType()" + col.getLabel() + " ->" +  col.getType() );
//                currDataColMap.put(col.getName(), col.getType());
//            }
            Map<String, Integer> currDataColMap = gpDataProvider
                    .getColumnTypesOriginal(new Table(DEFAULT_DATASET_ID, vo.getTable()));
            boolean record_id_flag = false;
            if (currDataColMap.containsKey(DatasetConstant.DEFAULT_ID_FIELD)) {
                record_id_flag = true;
            }

            //推荐的预览
            if (null != vo.getData() && StringUtils
                    .isNotEmpty(vo.getData().getString(ColumnConstant.TRANSFORM_TYPE))) {
                Map<String, Integer> columnTypes = gpDataProvider
                    .getColumnTypesOriginal(new Table(DEFAULT_DATASET_ID, vo.getTable()));
//                DataPattern dataPattern2 = new org.zjvis.datacenter.service.vo.table.Table(defaultDataSourceKey, vo.getTable().split("\\.")[0], vo.getTable().split("\\.")[1]);
//                List<Column> columns = gpDataService.queryTableMeta(dataPattern2).getResult();
//                Map<String, Integer> columnTypes = Maps.newLinkedHashMap();
//                for (Column col : columns) {
//                    columnTypes.put(col.getLabel().toUpperCase(), col.getType());
//                }
                sql = TransFormSqlHelper.initPreviewSql(columnTypes, vo, orderBy, pageSize, offset);
            }
//            } else if (record_id_flag && recordsAmount < LIMIT_RECORDS) {
//                //TSNE flag 为了兼顾其他逻辑， 有_record_id_的 继续走这里 原来没有这个if
//                // 百万级数据查询 不走这里， orderby 很费时，
//                sql = String.format("SELECT * FROM %s %s ORDER BY %s LIMIT %s OFFSET %s",
//                        vo.getTable(), filter, orderBy, pageSize, offset);
//            } else {
//                //TSNE flag 没有_record_id_
//                sql = String.format("SELECT * FROM %s %s LIMIT %s OFFSET %s",
//                        vo.getTable(), filter, pageSize, offset);
//            }

            //针对 访客的数据条目限制
            if(vo.getNeedLimit()){ //[queryDetail]
                sql = String.format("SELECT * FROM %s %s  LIMIT %s", vo.getTable(), filter, Math.min(10, recordsAmount/2));
                page.setTotalElementsAndPage(Math.min(10, recordsAmount/2));
                logger.error("sql -> {}", sql);
            }

//            conn = gpDataProvider.getConn(DEFAULT_DATASET_ID);
//            Statement st = conn.createStatement();
//            //真实数据查询
//            ResultSet rs = st.executeQuery(sql);
            List<Column> columnList = gpDataProvider.queryTableMeta(defaultDataSourceKey, "select * from " + vo.getTable());
//            LinkedHashMap<String, Object> colsMeta = Maps.newLinkedHashMap();
            JSONArray queryResult = gpDataProvider.executeQuerySQL(defaultDataSourceKey, sql, JSONArray.class);
            ArrayList<Pair<String, String>> colsMeta = columnList.stream().map(column -> Pair.of(column.getName(), column.getTypeName())).collect(Collectors.toCollection(Lists::newArrayList));
//            ResultSetMetaData meta = rs.getMetaData();

            //生成结果的head结构
            JSONArray heads = new JSONArray();

            //若为清洗节点从taskInstance取元数据，否则从task取元数据，若出错则从数据库取元数据
            try {
                int taskType = taskDTO.getType();
                JSONObject info = new JSONObject();
                if (taskType == TaskTypeEnum.TASK_TYPE_CLEAN.getVal()) {
                    TaskInstanceDTO taskInstance = taskInstanceService.queryInstanceByTableName(tableName, vo.getTaskId());
                    info = TaskInstanceDTOUtil.getOutputArray(taskInstance).getJSONObject(0);
                } else {
                    info = JSONObject.parseObject(taskDTO.getDataJson()).getJSONArray("output")
                            .getJSONObject(0);
                }
                JSONObject semantics = info.getJSONObject("semantic");
                JSONArray tableCols = info.getJSONArray("tableCols");
                JSONArray columnTypes = info.getJSONArray("columnTypes");

                JSONObject numberFormat = null;
                if (info.containsKey("numberFormat")) {
                    numberFormat = info.getJSONObject("numberFormat");
                }

                JSONObject types = new JSONObject();
                for (int i = 0; i < tableCols.size(); i++) {
                    types.put(tableCols.getString(i), columnTypes.getString(i));
                }
//                heads = HeadUtil.wrapHead(meta, types, semantics, numberFormat);
                heads = HeadUtil.wrapHead(colsMeta, types, semantics, numberFormat);
            } catch (Exception e) {
                //没有taskDTO 就没有语义信息
                heads = HeadUtil.wrapBasicHead(colsMeta);
            }

//            List<String> colNames = IntStream.range(1, meta.getColumnCount() + 1).mapToObj(i -> {
//                try {
//                    return meta.getColumnName(i);
//                } catch (SQLException e) {
//                }
//                return org.apache.commons.lang.StringUtils.EMPTY;
//            }).filter(nameStr ->  !nameStr.contains(ColumnConstant.ROW_NUM)).collect(Collectors.toList());

            List<String> colNames = columnList.stream().filter(column -> !column.getName().contains(ColumnConstant.ROW_NUM)).map(Column::getName).collect(Collectors.toList());

            JSONArray ja = new JSONArray();
            /* 生成data结构 */
            long fake_id = 1l;
            List<Map> mapList = queryResult.toJavaList(Map.class);
            for (Map row : mapList) {
                JSONObject column = new JSONObject();
                for (String colName : colNames) {
                    column.put(colName, row.get(colName));
                }
                // TSNE flag
//                if (!record_id_flag) {
//                    column.put("_record_id_", fake_id++);
//                }
                // TSNE flag
                ja.add(column);
            }
//            while (rs.next()) {
//                JSONObject column = new JSONObject();
//                for (String colName : colNames) {
//                    column.put(colName, rs.getString(colName));
//                }
//                // TSNE flag
//                if (!record_id_flag) {
//                    column.put("_record_id_", fake_id++);
//                }
//                // TSNE flag
//                ja.add(column);
//            }
            result.put("head", heads);
            result.put("data", ja);
            result.put("page", page);
        } catch (Exception e) {
            logger.error("TColumnService queryDetail error,column={}, errMsg={}",
                    vo.getTable() + "." + vo.getName(), e.getMessage());
        }
//        finally {
//            JDBCUtil.close(conn, null, null);
//        }

        return result;
    }
    /**
     *
     * @param vo
     * @return
     */
    @SneakyThrows
    public JSONObject queryDetail(ColumnQueryVO vo) {
        MessageDigest md5 = MessageDigest.getInstance("MD5");
        md5.update(StandardCharsets.UTF_8.encode(vo.toString()));
        String md5Code = "NEBULA_QUERY_" + String.format("%032x", new BigInteger(1, md5.digest()));
        if (!vo.getForceQuery()) {
            //拿缓存
            Object cacheResult = redisUtil.get(md5Code);
            if (ObjectUtil.isNotNull(cacheResult)) {
                //命中后延长过期时间
                redisUtil.expire(md5Code, CACHE_EXPIRE_TIME);
                logger.info(" [queryDetail] hit the cache when query table {} ", vo.getTable());
                return (JSONObject) cacheResult;
            }
        }
        TaskDTO taskDTO = null;
        if (ObjectUtil.isNotNull(vo.getTaskId())){
            taskDTO = taskMapper.queryById(vo.getTaskId());
        }
        boolean isSqlTask = false;
        if (ObjectUtil.isNotNull(taskDTO) && (taskDTO.getType().equals(TaskTypeEnum.TASK_TYPE_ETL.getVal()))) {
            JSONObject data = JSONObject.parseObject(taskDTO.getDataJson());
            if (data.containsKey("algType")&&data.containsKey("sql")) {
                isSqlTask = true;
            }
        } else {
            if (ObjectUtil.isNotNull(taskDTO) && (null == vo.getTable() || vo.getTable().isEmpty())){
                vo.setTable(TaskUtil.extractTableStr(taskDTO));
            }else {
                vo.setTable(ToolUtil.alignTableName(vo.getTable(), 0L));
            }
        }

        String name = vo.getName();
        String sortVal = vo.getSortVal();
        String filter = filter(vo, false);
        Page<JSONArray> page = new Page<>();
        int curPage = vo.getCurPage();
        int pageSize = vo.getPageSize();

        String tableName = vo.getTable();

        int offset = (vo.getCurPage() - 1) * vo.getPageSize();

        page.setPageSize(pageSize);
        page.setCurPage(curPage);

        String orderBy = "";
        if (StringUtils.isNotEmpty(name) && StringUtils.isNotEmpty(sortVal)) {
            orderBy = SqlUtil.formatPGSqlColName(name) + " " + sortVal + ","+ SqlUtil.formatPGSqlColName(DataCleanSqlHelper.IDCOLUMN_NAME) + " ASC";
        } else {
            orderBy = SqlUtil.formatPGSqlColName(DataCleanSqlHelper.IDCOLUMN_NAME) + " ASC";
        }
        JSONObject result = new JSONObject();
        Connection conn = null;
        Integer recordsAmount = 0;
        try {
            //查询数据的总条数
            conn = gpDataProvider.getConn(DEFAULT_DATASET_ID);
            Statement st = conn.createStatement();
            String sql = String.format(QUERY_TOTAL_SQL, vo.getTable(), filter);
            ResultSet rs = st.executeQuery(sql);
            if (rs.next()) {
                recordsAmount = rs.getInt(1);
                page.setTotalElementsAndPage(recordsAmount);
            }

            Map<String, Integer> currDataColMap = gpDataProvider
                    .getColumnTypesOriginal(new Table(DEFAULT_DATASET_ID, vo.getTable()));
            boolean record_id_flag = false;
            if (currDataColMap.containsKey(DatasetConstant.DEFAULT_ID_FIELD)) {
                record_id_flag = true;
            }

            //推荐的预览
            if (isSqlTask){
                sql = String.format("SELECT * FROM %s %s LIMIT %s OFFSET %s",
                    vo.getTable(), filter, pageSize, offset);
            } else if (null != vo.getData() && StringUtils
                .isNotEmpty(vo.getData().getString(ColumnConstant.TRANSFORM_TYPE))) {
                Map<String, Integer> columnTypes = gpDataProvider
                    .getColumnTypesOriginal(new Table(DEFAULT_DATASET_ID, vo.getTable()));
                sql = TransFormSqlHelper.initPreviewSql(columnTypes, vo, orderBy, pageSize, offset);
            } else if (record_id_flag && recordsAmount < LIMIT_RECORDS) {
                //TSNE flag 为了兼顾其他逻辑， 有_record_id_的 继续走这里 原来没有这个if
                // 百万级数据查询 不走这里， orderby 很费时，
                sql = String.format("SELECT * FROM %s %s ORDER BY %s LIMIT %s OFFSET %s",
                    vo.getTable(), filter, orderBy, pageSize, offset);
            } else {
                //TSNE flag 没有_record_id_
                sql = String.format("SELECT * FROM %s %s LIMIT %s OFFSET %s",
                    vo.getTable(), filter, pageSize, offset);
            }

            //针对 访客的数据条目限制
            if(vo.getNeedLimit()){ //[queryDetail]
                sql = String.format("SELECT * FROM %s %s  LIMIT %s", vo.getTable(), filter, Math.min(10, recordsAmount/2));
                page.setTotalElementsAndPage(Math.min(10, recordsAmount/2));
                logger.error("sql -> {}", sql);
            }

            rs = st.executeQuery(sql);
            ResultSetMetaData meta = rs.getMetaData();

            //生成结果的head结构
            JSONArray heads = new JSONArray();

            //若为清洗节点从taskInstance取元数据，否则从task取元数据，若出错则从数据库取元数据
            try {
                int taskType = taskDTO.getType();
                JSONObject info = new JSONObject();
                if (taskType == TaskTypeEnum.TASK_TYPE_CLEAN.getVal()) {
                    TaskInstanceDTO taskInstance = taskInstanceService.queryInstanceByTableName(tableName, vo.getTaskId());
                    info = TaskInstanceDTOUtil.getOutputArray(taskInstance).getJSONObject(0);
                } else {
                    info = JSONObject.parseObject(taskDTO.getDataJson()).getJSONArray("output")
                        .getJSONObject(0);
                }
                JSONObject semantics = info.getJSONObject("semantic");
                JSONArray tableCols = info.getJSONArray("tableCols");
                JSONArray columnTypes = info.getJSONArray("columnTypes");

                JSONObject numberFormat = null;
                JSONObject dateFormat = null;
                if (info.containsKey("numberFormat")) {
                    numberFormat = info.getJSONObject("numberFormat");
                }
                if (info.containsKey("dateFormat")) {
                    dateFormat = info.getJSONObject("dateFormat");
                }

                JSONObject types = new JSONObject();
                for (int i = 0; i < tableCols.size(); i++) {
                    types.put(tableCols.getString(i), columnTypes.getString(i));
                }
                heads = HeadUtil.wrapHead(meta, types, semantics, numberFormat, dateFormat);
            } catch (Exception e) {
                //没有taskDTO 就没有语义信息
                heads = HeadUtil.wrapBasicHead(meta);
            }

            List<String> colNames = IntStream.range(1, meta.getColumnCount() + 1).mapToObj(i -> {
                try {
                    return meta.getColumnName(i);
                } catch (SQLException e) {
                }
                return org.apache.commons.lang.StringUtils.EMPTY;
            }).filter(nameStr ->  !nameStr.contains(ColumnConstant.ROW_NUM)).collect(Collectors.toList());

            JSONArray ja = new JSONArray();
            /* 生成data结构 */
            long fake_id = 1l;
            while (rs.next()) {
                JSONObject column = new JSONObject();
                for (String colName : colNames) {
                    column.put(colName, rs.getString(colName));
                }
                // TSNE flag
                if (!record_id_flag) {
                    column.put("_record_id_", fake_id++);
                }
                // TSNE flag
                ja.add(column);
            }

            result.put("head", heads);
            result.put("data", ja);
            result.put("page", page);
        } catch (Exception e) {
            redisUtil.del(md5Code);
            logger.error("TColumnService queryDetail error,column={}, errMsg={}",
                vo.getTable() + "." + vo.getName(), e.getMessage());
        }finally {
            JDBCUtil.close(conn, null, null);
        }
        redisUtil.set(md5Code, result, CACHE_EXPIRE_TIME);
        return result;
    }

    private String filter(ColumnQueryVO vo, boolean isSearch) {
        JSONArray filter;
        if (isSearch) {
            filter = vo.getSearchFilter();
        } else {
            filter = vo.getFilter();
        }
        if (CollectionUtil.isEmpty(filter)) {
            return "";
        }
        JSONObject data = new JSONObject();
        data.put("filter", filter);
        List<ConfigComponent> configComponents = DataCleanSqlHelper.parseFilter(data);
        SqlHelper sqlHelper = new SqlHelper();
        Table table = new Table(DEFAULT_DATASET_ID, vo.getTable());
        String defaultDataSourceKey = servletContext.getAttribute(Constant.DEFAULT_DATA_SOURCE_KEY).toString();
        List<Column> columnsInfo = gpDataProvider.queryTableMeta(defaultDataSourceKey, "SELECT * FROM " + table.getName() + " WHERE 1=0");
        Map<String, Integer> columnTypes = Maps.newLinkedHashMap();
        for (Column col: columnsInfo){
            columnTypes.put(col.getLabel().toUpperCase(), col.getType());
        }
        sqlHelper.bind(columnTypes);
        String sql = sqlHelper.assembleFilter(configComponents);
        //整数的模糊匹配
        if (isSearch) {
            if (SqlSyntaxHelper.isNumber(vo.getType())) {
                JSONArray f1 = filter.getJSONArray(0);
                JSONObject filterObj = f1.getJSONObject(0);
                String col = SqlUtil.formatPGSqlColName(filterObj.getString("col"));
                sql = sql.replaceAll(col, String.format("CAST(%s as text)", col));
            }
        }
        //以下将列名中的"转义，及加"防止和保留字段冲突
        sql = DataCleanSqlHelper.formatPGSqlColName(filter, sql);
        return sql;
    }

    @Transactional(rollbackFor = Exception.class)
    public TaskInstanceVO addAction(TaskInstanceVO vo) {
        JSONObject voData = vo.getData();
        TaskDTO currentTask = taskMapper.queryById(vo.getTaskId());
        List<TaskInstanceDTO> actions = taskInstanceService.queryByTaskIdAndOrder(vo.getTaskId());
        //先删除action操作
        this.clearActions(actions, vo.getTaskId());
        //将当前要添加的action加入到action列表中
        int index = 0;
        for (int i = 0; i < actions.size(); i++) {
            TaskInstanceDTO dto = actions.get(i);
            JSONObject data = JSONObject.parseObject(dto.getDataJson());
            Long id = data.getLong("id");
            if (id.equals(voData.getLong("id"))) {
                index = i;
            }
        }
        actions.add(index + 1, vo.toTask(vo));

        //提取出父节点的输出完整表名
        String table = getParentOutTable(currentTask);
        //重新生成action
        String logInfo = rebuildActions(actions, vo.getTaskId(), table);
        if (StringUtils.isEmpty(logInfo)) {
            TaskInstanceDTO addAction = actions.get(index + 1);
            //记录到撤销栈
            actionService.addCleanUpUndo(ActionEnum.CLEAN_ADD,
                    PipelineUtil.parseJSONObject(addAction, index + 1),
                    null, currentTask.getPipelineId());
            return addAction.view();
        } else {
            System.out.println("addAction failed -> " + logInfo);
            return actions.get(actions.size() - 1).view();
        }

    }

    @Transactional(rollbackFor = Exception.class)
    public void clearActions(List<TaskInstanceDTO> actions, Long taskId) {
        if (actions == null || actions.isEmpty()) {
            return;
        } else if (actions.size() > 1) {
            Long id = actions.get(1).getId();//清理第二条生成的表
            TaskInstanceDTO taskInstanceDTO = taskInstanceMapper.queryById(id);
            String table = JSONObject.parseObject(taskInstanceDTO.getDataJson()).getString("table");
            //TODO 删除时机不对， 部分节点 在执行过程中，仍会请求已经被删除的表，先注掉
//            if (!table.split("\\.")[0].equals(GREEN_PLUM_DEFAULT_SCHEMA)) {
//                gpDataProvider.dropRedundantTables(table, true);
//            }
        }
        taskInstanceService.deleteActions(taskId, actions.get(0).getId());
    }

    public String getModelTaskOutputTable(TaskDTO taskDTO) {
        try {
            String dataJson = taskDTO.getDataJson();
            JSONObject data = JSONObject.parseObject(dataJson);
            JSONArray outputarray = data.getJSONArray("output");
            JSONObject output = outputarray.getJSONObject(0);
            String otableName = output.getString("tableName");
            if (!otableName.contains(".")) {
                otableName = "dataset." + otableName;
            }
            return otableName;
        } catch (Exception e) {
            logger.error("unable to set table name");
            return "";
        }
    }

    public String getParentOutTable(TaskDTO currentTask) {
        if (StringUtils.isEmpty(currentTask.getParentId())) {
            throw new DataScienceException("没有连接父节点，无清洗数据来源");
        }
        TaskDTO parentTask = taskService.queryById(Long.valueOf(currentTask.getParentId()));

        if (parentTask.getType() == TaskTypeEnum.TASK_TYPE_MODEL.getVal()) {
            return getModelTaskOutputTable(parentTask);
        }
        if (parentTask.getType() == TaskTypeEnum.TASK_TYPE_DATA.getVal()) {
            // 数据节点
            JSONObject data = parentTask.view().getData();
            JSONArray output = data.getJSONArray("output");
            JSONObject item = output.getJSONObject(0);
            return item.getString("tableName");
        }
        TaskInstanceDTO parentIns = taskInstanceService
            .queryLatestInstanceForTask(Long.valueOf(currentTask.getParentId()));
        if (parentIns == null) {
            return StringUtils.EMPTY;
        }
        JSONArray parentOutPut = PipelineUtil.extractOutPut(parentTask, parentIns);
        String table = "";
        for (int i = 0; i < parentOutPut.size(); i++) {
            if (!parentOutPut.getJSONObject(i).getString("tableName").contains("model")) {
                table = parentOutPut.getJSONObject(i).getString("tableName");
            }
        }
        return table;
    }

    public TaskInstanceDTO checkFormula(TaskInstanceVO vo) {
        JSONObject data = vo.getData();
        String sourceTable = data.getString("table");
        boolean edit = data.getBoolean("isEdit");
        sourceTable = ToolUtil.alignTableName(sourceTable, 0L);
        Table table = new Table(DEFAULT_DATASET_ID, sourceTable);
        Map<String, Integer> columnTypes = gpDataProvider.getColumnTypesOriginal(table);
        String sql;
        vo.setStatus(TaskInstanceStatus.FAIL.toString());
        Connection conn = null;
        Statement st;
        ResultSet rs;
        try {
            sql = DataCleanSqlHelper
                .initAddColumnBasedOnFormulaSql(columnTypes, data, vo.getTaskId(), edit);
            if (StringUtils.isNotEmpty(sql)) {
                conn = gpDataProvider.getConn(DEFAULT_DATASET_ID);
                st = conn.createStatement();
                rs = st.executeQuery(sql);
                rs.next();
            }
            vo.setLogInfo(null);
            vo.setStatus(TaskInstanceStatus.SUCCESS.toString());
        } catch (Exception e) {
            vo.setLogInfo(String.format(Constant.errorTpl, e.getMessage().replaceAll("\"", "'")));
        } finally {
            JDBCUtil.close(conn, null, null);
        }
        return vo.toTask(vo);
    }

    public Map<String, Integer> getDataJsonColumnTypes(JSONObject data, TaskInstanceVO vo){
        Long taskId = vo.getTaskId();
        Map<String, Integer> ret = new LinkedHashMap<>();
        if (taskId != null && taskId != 0L && JSONObject.parseObject(taskService.queryById(taskId).getDataJson())
            .getJSONArray("output").size() > 0) {
            TaskDTO taskDTO = taskService.queryById(taskId);
            JSONObject dataJson = JSONObject.parseObject(taskDTO.getDataJson());
            JSONArray outputs = dataJson.getJSONArray("output");
            JSONObject output = outputs.getJSONObject(0);
            if (data.getString("action").equals("ORIGINAL_DATA")) {
                try {
                    output = JSONObject
                            .parseObject(dataJson.getJSONArray("input").getJSONObject(0).toJSONString());
                    outputs.set(0, output);
                }catch (Exception e){
                    throw DataScienceException.of(BaseErrorCode.PARENT_INPUT_INFO_NOT_EXIST);
                }
            }
            JSONArray tableCols = output.getJSONArray("tableCols");
            JSONArray columnTypes = output.getJSONArray("columnTypes");
            for (int i = 0; i < tableCols.size(); i++){
                ret.put(tableCols.getString(i), SqlUtil.encodeType(columnTypes.getString(i)));
            }
        } else {
            String sourceTable = data.getString("table");
            sourceTable = ToolUtil.alignTableName(sourceTable, 0L);
            Table table = new Table(DEFAULT_DATASET_ID, sourceTable);
            ret = gpDataProvider.getColumnTypesOriginal(table);
        }
        return ret;
    }

    public TaskInstanceDTO addOneAction(JSONObject data, TaskInstanceVO vo) {
        String defaultDataSourceKey = servletContext.getAttribute(Constant.DEFAULT_DATA_SOURCE_KEY).toString();
        Map<String, Integer> columnTypes = getDataJsonColumnTypes(data, vo);
        ActionEnum actionEnum = ActionEnum.valueOf(data.getString("action"));
        String transformType = data.getString(ColumnConstant.TRANSFORM_TYPE);

        if (SPLIT.equals(actionEnum)) {
            int splitType = data.getInteger("splitType");
            if (splitType == ColumnConstant.SPLIT_ALL) {
                String baseTable = data.getString("table");
                List<Column> columnsInfo = gpDataProvider.queryTableMeta(defaultDataSourceKey, "SELECT * FROM " + baseTable + " LIMIT 1");
                List<String> columnsName=new ArrayList();
                for (Column col: columnsInfo){
                    columnsName.add(col.getName());
                }
                data.put("columnsName",columnsName);
                String sql = DataCleanSqlHelper.initQueryMaxSplitSql(data);
                int maxSplit = queryMaxSplit(sql);
                data.put("maxSplit", maxSplit);
            }
        }
        if (ColumnConstant.WORD_INITIAL.equals(transformType) ||
                ColumnConstant.WORD_REMOVE.equals(transformType)) {
            String sql = DataCleanSqlHelper.initQueryWordNumberSql(data);
            int maxSplit = queryMaxSplit(sql);
            data.put("maxSplit", maxSplit);
        }
        String sql = null;
        vo.setStatus(TaskInstanceStatus.FAIL.toString());

        boolean check = true;
        if (ADD_COLUMN_BASED_ON_FORMULA.equals(actionEnum)) {
            ResultSet rs = null;
            try {
                String baseTable = data.getString("table");
                List<Column> columnsInfo = gpDataProvider.queryTableMeta(defaultDataSourceKey, "SELECT * FROM " + baseTable + " LIMIT 1");
                System.out.println("columnsInfo -> "+ columnsInfo);
                Map<String, Integer> newColumnTypes = Maps.newLinkedHashMap();
                for (Column col: columnsInfo){
                    newColumnTypes.put(col.getLabel(), col.getType());
                }
                columnTypes = newColumnTypes;
                String formulaSql = DataCleanSqlHelper
                    .initAddColumnBasedOnFormulaSql(newColumnTypes, data, vo.getTaskId(), false);
                System.out.println("formula SQL -> "+ formulaSql);
                gpDataProvider.executeSql(formulaSql);
            } catch (Exception e) {
                logger.error("addOneAction [add col by formula] failed since = {}", e.getMessage(), e);
                check = false;
                vo.setStatus(TaskInstanceStatus.FAIL.toString());
                vo.setLogInfo(String.format(Constant.errorTpl, BaseErrorCode.FORMULA_EXECUTE_FAILED.getMsg() + ",请检查上一步操作，是否可以应用此公式。"));
            }
        }
        if (TYPE_TRANSFORM.equals(actionEnum) && !semanticService.validateTypeTransform(data, columnTypes)){
            String errorMsg = String.format(Constant.errorTpl, String.format(
                "%s 不能全部转成 %s", Joiner.on(",").join(data.getJSONArray("col")),
                data.getString("toType")));
            vo.setLogInfo(errorMsg);
            check = false;
        }
        if (check) {
            try {
                String baseTable = data.getString("table");
                baseTable = ToolUtil.alignTableName(baseTable, 0L);
                List<Column> columnsInfo = gpDataProvider.queryTableMeta(defaultDataSourceKey, "SELECT * FROM " + baseTable + " LIMIT 1");
                Map<String, Integer> newColumnTypes = Maps.newLinkedHashMap();
                for (Column col: columnsInfo){
                    newColumnTypes.put(col.getLabel(), col.getType());
                }
                columnTypes = newColumnTypes;
                //创建sql语句
                if (SEMANTIC_TRANSFORM.equals(actionEnum)){
                    sql = semanticService.semanticTransform(columnTypes, data, vo.getTaskId());
                } else if (FILTER.equals(actionEnum)) {
                    JSONObject data2 = JSONObject.parseObject(data.toJSONString());
                    JSONObject filter = data2.getJSONArray("filter").getJSONArray(0).getJSONObject(0);
                    filter.put("includeNull",false);

                    String filterType = filter.getString("filterType");
                    switch (filterType) {
                        case "a%-b%":
                        case "0%-b%":
                        case "a%-100%":
                            if (data2.getJSONArray("filter").getJSONArray(0).size()==2) {
                                JSONObject filter2 = data2.getJSONArray("filter").getJSONArray(0).getJSONObject(1);
                                String filterType2 = filter2.getString("filterType");
                                if ("=".equals(filterType2)) {
                                    filter.put("includeNull",true);
                                    data2.getJSONArray("filter").getJSONArray(0).remove(1);
                                }
                            }
                            String table = data2.getString("table");
                            String col = SqlUtil.formatPGSqlColName(data2.getString("col"));
                            String countSql = "select count(*) from "+table+" where "+col+" is not null";
                            Integer count = gpDataProvider.executeQuerySQL(defaultDataSourceKey, countSql, Integer.class);
                            if (ObjectUtil.isNotNull(count)) {
                                JSONArray values = filter.getJSONArray("values");
                                Double v0 = 0.0;
                                Double v1 = 100.0;
                                if ("a%-b%".equals(filterType)) {
                                    v0 = Double.valueOf(values.getString(0));
                                    v1 = Double.valueOf(values.getString(1));
                                } else if ("0%-b%".equals(filterType)) {
                                    v1 = Double.valueOf(values.getString(0));
                                } else if ("a%-100%".equals(filterType)) {
                                    v0 = Double.valueOf(values.getString(0));
                                }
                                if (v0<0||v1>100) {
                                    throw new DataScienceException("参数范围错误，参数一不能小于1参数二不能大于100");
                                } else if (v0>v1) {
                                    throw new DataScienceException("参数范围错误，参数一不能大于1参数二");
                                }
                                long limit;
                                if (v1==100) {
                                    limit = -1;
                                } else if (v0==0) {
                                    limit = Math.round(count*(v1-v0)/100f);
                                } else {
                                    limit = Math.round(count*(v1-v0)/100f)+1;
                                }
                                long offset = Math.round(count*v0/100f);
                                offset = offset<1?0:offset-1;
                                values.set(0,limit);
                                values.set(1,offset);
                            }
                            sql = DataCleanSqlHelper.initSql(columnTypes, data2, vo.getTaskId());
                            data.put("table",data2.getString("table"));
                            data.put("label", data2.getString("label"));
                            break;
                        default:
                            sql = DataCleanSqlHelper.initSql(columnTypes, data, vo.getTaskId());
                            break;
                    }
                } else {
                    sql = DataCleanSqlHelper.initSql(columnTypes, data, vo.getTaskId());
                }
                if (sql.startsWith("error: ")){
                    vo.setStatus(TaskInstanceStatus.FAIL.toString());
                    vo.setLogInfo(String.format(Constant.errorTpl, sql));
                } else {
                    logger.info("addOneAction, sql = {}", sql);
                    if (StringUtils.isNotEmpty(sql)) {
                        gpDataProvider.executeSql(sql);
                    } else if (!ORIGINAL_DATA.equals(actionEnum)){
                        throw DataScienceException.of(BaseErrorCode.ERROR, actionEnum.name()+"失败", null);
                    }
                    vo.setLogInfo(null);
                    vo.setStatus(TaskInstanceStatus.SUCCESS.toString());
                    if (TYPE_TRANSFORM.equals(actionEnum) && StringUtils.isNotEmpty(sql)) {
                        JSONArray cols = data.getJSONArray("col");
                        List<String> wrappedCols = Lists.newArrayList();
                        for (int i = 0; i < cols.size(); i++) {
                            wrappedCols.add(String.format("count(\"%s\")", cols.getString(i)));
                        }
                        try {
                            String query = String
                                .format("select %s from %s", Joiner.on(",").join(wrappedCols),
                                    data.getString("table"));
                            gpDataProvider.executeQuerySQL(defaultDataSourceKey, query, JSONArray.class);
                        } catch (Exception e) {
                            String errorMsg = String.format(Constant.errorTpl, String.format(
                                "%s 不能全部转成 %s", Joiner.on(",").join(cols),
                                data.getString("toType")));
                            logger.error("Clean action [Type_Transform] failed, since {}",
                                e.getMessage());
                            vo.setStatus(TaskInstanceStatus.FAIL.toString());
                            vo.setLogInfo(errorMsg);
                        }
                    }
                }
            } catch (Exception e) {
                logger.error("addOneAction failed since = {}", e.getMessage(), e);
                vo.setLogInfo(
                    String.format(Constant.errorTpl, e.getMessage().replaceAll("\"", "'")));
            }

        }

        //保存action
        vo.setSqlText(sql);
        if (null == vo.getParentId()) {
            vo.setParentId(String.valueOf(vo.getTaskId()));
        }

        if (vo.getLogInfo() == null) {
            updateActionOutput(vo, data);
        }
        vo.setPipelineId(vo.getTaskId());
        vo.setType(TaskTypeEnum.TASK_TYPE_CLEAN.getVal());

        TaskInstanceDTO taskInstanceDTO = vo.toTask(vo);
        taskInstanceDTO.setGmtCreate(LocalDateTime.now());
        taskInstanceDTO.setGmtCreator(JwtUtil.getCurrentUserId());
        taskInstanceDTO.setGmtModifier(JwtUtil.getCurrentUserId());
        taskInstanceDTO.setId(null);
        taskInstanceMapper.save(taskInstanceDTO);

        data.put("id", taskInstanceDTO.getId());
        data.put("status", taskInstanceDTO.getStatus());
        data.put("logInfo", taskInstanceDTO.getLogInfo());
        data.put("gmtCreate", taskInstanceDTO.getGmtCreate()
            .format(DateTimeFormatter.ofPattern(TimeUtil.FULL_DATE_FORMAT)));
        taskInstanceDTO.setDataJson(data.toJSONString());
        taskInstanceMapper.update(taskInstanceDTO);
        return taskInstanceDTO;
    }

    /**
     * 更新action语义和数据类型
     */
    private void updateActionOutput(TaskInstanceVO vo, JSONObject data) {
        Long taskId = vo.getTaskId();
        TaskDTO taskDTO = taskService.queryById(taskId);
        if (taskId != null && taskId != 0L && JSONObject.parseObject(taskDTO.getDataJson())
            .getJSONArray("output").size() > 0) {
            JSONObject dataJson = JSONObject.parseObject(taskDTO.getDataJson());
            JSONArray outputs = dataJson.getJSONArray("output");
            JSONObject output = outputs.getJSONObject(0);

            JSONObject numberFormat = output.getJSONObject("numberFormat");

            if (data.getString("action").equals("ORIGINAL_DATA")) {
                output = JSONObject
                    .parseObject(dataJson.getJSONArray("input").getJSONObject(0).toJSONString());
                outputs.set(0, output);
            }
            JSONObject semantics = output.getJSONObject("semantic");
            JSONArray oldTableCols = output.getJSONArray("tableCols");
            List<String> oldColumnTypes = output.getJSONArray("columnTypes")
                .toJavaList(String.class);
            ActionEnum actionEnum = ActionEnum.valueOf(data.getString("action"));

            JSONObject kv = new JSONObject();
            List<String> newNames = new ArrayList<>();
            List<String> newColumns = new ArrayList<>();
            for (int i = 0; i < oldTableCols.size(); i++) {
                kv.put(oldTableCols.getString(i), oldColumnTypes.get(i));
            }

            JSONObject categoryOrder = new JSONObject();
            if (output.getJSONObject("categoryOrder") != null) {
                categoryOrder = output.getJSONObject("categoryOrder");
            }

            String targetTable = data.getString("table");
            String[] split = targetTable.split("\\.");

//            String dataSourceKey = (String) servletContext.getAttribute(Constant.DEFAULT_DATA_SOURCE_KEY);
//            DataPattern dataPattern = new org.zjvis.datacenter.service.vo.table.Table(dataSourceKey, split[0], split[1]);
//            RpcResult<Map<String, String>> mapRpcResult = gpDataService.queryTableMetaMap(dataPattern);
//            //方法签名返回Map 但真的有可能变成List
//            Map<String, String> tableMetaMap = null;
//            if (mapRpcResult.getResult() instanceof List){
//                List temp  = (List)mapRpcResult.getResult();
//                tableMetaMap = (Map<String, String>) temp.stream().collect(Collectors.toMap(Column::getName, Column::getTypeName));
//            }else if (mapRpcResult.getResult() instanceof Map){
//                tableMetaMap = mapRpcResult.getResult();
//            }


            Map<String, String> tableMetaMap = gpDataProvider.getTableMetaMap(split[1], split[0]);
            for (Map.Entry<String, String> entry : tableMetaMap.entrySet()) {
                String name = entry.getKey();
                if (kv.getString(name) != null) {
                    String type = kv.getString(name);
                    if (type.equals(DataTypeEnum.JSON.name()) || type
                        .equals(DataTypeEnum.ARRAY.name()) || type
                        .equals(DataTypeEnum.DATE.name())) {
                        tableMetaMap.put(name, type);
                    }
                } else {
                    newNames.add(name);
                    newColumns.add(SqlUtil.changeType(entry.getValue()));
                }
            }

            switch (actionEnum) {
                case TYPE_TRANSFORM: {
                    JSONArray changedCols = data.getJSONArray("col");
                    if (vo.getStatus().equals(TaskInstanceStatus.FAIL.toString())) {
                        //类型转换失败时，回滚操作
                        for (Object col : changedCols) {
                            String oldType = oldColumnTypes.get(oldTableCols.indexOf(col));
                            tableMetaMap.put((String) col, oldType);
                        }
                    } else {
                        String toType = data.getString("toType");
                        for (int i = 0; i < changedCols.size(); i++) {
                            List<String> newCols = (List<String>) data.get("columnName");
                            if (toType.equals("date") && data.getJSONArray("semantic") != null && data.getJSONArray("semantic").getString(i).equals("id")){
                                String newCol = newCols.get(i);
                                tableMetaMap.put(newCol, toType.toUpperCase());
                            } else {
                                tableMetaMap.put(changedCols.getString(i), toType.toUpperCase());
                            }
                        }
                    }
                    break;
                }
                case RENAME:
                {
                    String oldColName = data.getString("col");
                    String newColName = data.getString("newCol");
                    int index = oldTableCols.indexOf(oldColName);
                    String oldColType = oldColumnTypes.get(index);
                    tableMetaMap.put(newColName, oldColType);
                    semantics.put(newColName, semantics.get(oldColName));
                    categoryOrder.put(newColName, categoryOrder.getJSONArray(oldColName));
                    break;
                }
                case NUMBER_FORMAT_CONVERSION:
                    String col = data.getString("col");
                    String subAction = data.getString("subAction");
                    Integer digit = data.getInteger("digit");

                    if (numberFormat == null) {
                        numberFormat = new JSONObject();
                    }

                    JSONObject jo2 = new JSONObject();
                    jo2.put("digit", digit);
                    jo2.put("action", subAction);
                    numberFormat.put(col,jo2);

                    output.put("numberFormat", numberFormat);
                    break;
                case COPY:
                case FORMAT_TRANSFORM:
                {
                    String oldColName = data.getString("col");
                    String newColName = data.getString("columnName");
                    if (newColName == null){
                        newColName = data.getString("newCol");
                    }
                    String oldColType = tableMetaMap.get(oldColName);
                    tableMetaMap.put(newColName, oldColType);
                    semantics.put(newColName, semantics.get(oldColName));
                    categoryOrder.put(newColName, categoryOrder.getJSONArray(oldColName));
                    JSONObject dateFormat = new JSONObject();
                    dateFormat.put(newColName,data.getString("format"));
                    output.put("dateFormat", dateFormat);
                    break;
                }
                case SEMANTIC_TRANSFORM: {
                    String semantic = data.getString("toSemantic");
                    String newColName = data.getString("columnName");
                    if (semantic.equals("null")){
                        semantics.put(data.getString("col"), semantic);
                    } else {
                        semantics.put(newColName, semantic);
                    }
                    break;
                }
                case ADD_COLUMN_BASED_ON_FORMULA:
                {
                    if (data.getString("formula") != null && data.getString("formula").contains("string(")){
                        break;
                    }
                }
                default: {
                    if (newNames.size() > 0) {
                        List<HeadVO> heads = new ArrayList<>();
                        for (String name : newNames) {
                            heads.add(HeadVO.builder()
                                .name(name)
                                .type(SqlUtil.changeType(tableMetaMap.get(name)))
                                .build());
                        }
                        newNames = SqlUtil.formatPGSqlCols(newNames);
                        String selectSql = String.format("select distinct %s from %s limit 5000",
                            Joiner.on(",").join(newNames.iterator()), targetTable);
//                        JSONArray meta = gpDataService.executeQuerySQL(dataSourceKey, selectSql, JSONArray.class).getResult();
                        JSONArray meta = gpDataProvider.executeQuery(selectSql);
                        List<Entity> values = JSONArray.parseArray(meta.toJSONString(), Entity.class);

                        DatasetUtil.recommendDataType(heads, values);
                        boolean flag = false;
                        for (int i = 0; i < heads.size(); i++) {
                            if (!heads.get(i).getType().equals(newColumns.get(i))) {
                                flag = true;
                            }
                        }
                        if (flag) {//若数据库识别的类型与平台识别的类型不一致则使用平台识别的类型
                            List<String> cols = Lists.newLinkedList(tableMetaMap.keySet());
                            cols = SqlUtil.formatPGSqlCols(cols);
                            for (HeadVO head : heads) {
                                tableMetaMap.put(head.getName(), head.getType().toUpperCase());
                                String colName = SqlUtil.formatPGSqlColName(head.getName());
                                int index = cols.indexOf(colName);
                                cols.remove(index);
                                cols.add(index, String.format(DataCleanSqlHelper.TRANSFORM_SQL, colName,
                                    DataTypeEnum.get(head.getType()).getGpType()));
                            }
                            selectSql = DataCleanSqlHelper
                                .buildSelectSql(cols.iterator(), targetTable);
                            String newTable = String
                                .format(DataCleanSqlHelper.VIEW_TABLE_NAME, taskId,
                                    System.currentTimeMillis());
                            String alterSql = String
                                .format(SqlTemplate.CREATE_VIEW_SQL, newTable, selectSql);
                            gpDataProvider.executeSql(alterSql);
//                            gpDataService.executeUpdateSQL(dataSourceKey, alterSql);
                            data.put("table", newTable);
                        }

                        if (!actionEnum.equals(SEMANTIC_TRANSFORM)) {
                            semanticService.recommendSemantic(heads,
                                values.subList(0, Math.min(20, values.size())), true);
                            for (HeadVO head : heads) {
                                semantics.put(head.getName(), head.getRecommendSemantic());
                            }
                        }
                    }
                }
            }

            //设置有序类别顺序
            if (data.getJSONArray("order") != null) {
                String oldColName = "";
                if (data.getString("columnName") != null) {
                    oldColName = data.getString("columnName");
                } else {
                    oldColName = data.getString("col");
                }
                JSONArray order = data.getJSONArray("order");
                categoryOrder.put(oldColName, order);
            }

            JSONObject newSemantics = new JSONObject();
            JSONObject newCategoryOrder = new JSONObject();
            for (Map.Entry<String, String> entry : tableMetaMap.entrySet()) {
                String name = entry.getKey();
                if (semantics!= null) {
                    newSemantics.put(name, semantics.getOrDefault(name, "null"));
                }else {
                    newSemantics.put(name, "null");
                }
                if (categoryOrder.getJSONArray(name) != null) {
                    newCategoryOrder.put(name, categoryOrder.getJSONArray(name));
                }
            }

            //更新output
            output.put("tableCols", tableMetaMap.keySet());
            output.put("columnTypes", tableMetaMap.values());
            output.put("semantic", newSemantics);
            output.put("categoryOrder", newCategoryOrder);
            output.put("tableName", data.getString("table"));
            data.put("inputInfo", dataJson);
            taskDTO.setDataJson(dataJson.toJSONString());
            taskMapper.update(taskDTO);
        }
    }

    /**
     * 查询最大拆分段数
     */
    private int queryMaxSplit(String sql) {

        try {
//            conn = gpDataProvider.getConn(DEFAULT_DATASET_ID);
//            Statement st = conn.createStatement();
//            ResultSet resultSet = st.executeQuery(sql);
//            if (resultSet.next()) {
//                return resultSet.getInt(1);
//            }
            String dataSourceKey = (String) servletContext.getAttribute(Constant.DEFAULT_DATA_SOURCE_KEY);
            return gpDataProvider.executeQuerySQL(dataSourceKey, sql, Integer.class);
        } catch (Exception e) {
            logger.error("queryMaxSplit fail, msg={}", e.getMessage());
        }
        return -1;
    }

    /**
     * 删除历史记录
     *
     * @param taskId 节点id
     * @param id     记录id
     */
    @Transactional(rollbackFor = Exception.class)
    public void deleteActions(Long taskId, Long id) {
        List<TaskInstanceDTO> actions = taskInstanceMapper.queryByTaskIdAndOrder(taskId);
        if (CollectionUtil.isNotEmpty(actions) && actions.get(0).getId().equals(id)) {
            throw new DataScienceException("原始数据无法删除");
        }
        int index = 0;
        for (int i = 0; i < actions.size(); i++) {
            TaskInstanceDTO dto = actions.get(i);
            JSONObject data = JSONObject.parseObject(dto.getDataJson());
            Long actionId = data.getLong("id");
            if (id.equals(actionId)) {
                index = i;
            }
        }
        if (index == 0) {
            //在列表中找不到要删除的那条。可能是重复点击了
            return;
        }
        this.clearActions(actions, taskId);
        TaskInstanceDTO remove = actions.remove(index);
        TaskDTO taskDTO = taskService.queryById(taskId);
        String table = getParentOutTable(taskDTO);
        //添加到撤销栈
        actionService
            .addCleanUpUndo(ActionEnum.CLEAN_DELETE,
                PipelineUtil.parseJSONObject(remove, index), null,
                taskDTO.getPipelineId());
        rebuildActions(actions, taskId, table);
    }

    public JSONObject updateOutPut(Long taskId, String targetTable) {
        TaskDTO taskDTO = taskMapper.queryById(taskId);
        JSONObject taskDataJson = JSONObject.parseObject(taskDTO.getDataJson());
        JSONArray output = taskDataJson.getJSONArray("output");
        JSONObject outputJson = output.getJSONObject(0);
        if (StringUtils.isEmpty(targetTable)) {
            //没有清洗记录，更新节点输出为节点的输入
            JSONArray input = taskDataJson.getJSONArray("input");
            JSONObject inputJson = input.getJSONObject(0);
            outputJson.put("tableCols", inputJson.getJSONArray("tableCols"));
            outputJson.put("columnTypes", inputJson.getJSONArray("columnTypes"));
            targetTable = inputJson.getString("tableName");
        }
        outputJson.put("tableName", targetTable);
        taskDataJson.put("isSimple", false);
        taskDTO.setDataJson(taskDataJson.toJSONString());
        taskMapper.update(taskDTO);

        //如果有子节点，级联更新子节点
        if (StringUtils.isNotEmpty(taskDTO.getChildId())) {
            String[] split = taskDTO.getChildId().split(",");
            for (String childTaskId : split) {
                TaskDTO childTask = taskMapper.queryById(Long.valueOf(childTaskId));
                //bug-BaseAlg.supplementForCheckbox里会对setParam做剔除。需要更新表名
                if (TaskTypeEnum.TASK_TYPE_ALGO.getVal().equals(childTask.getType())) {
                    updateSetParamTable(childTask, targetTable);
                }
                List<ApiResultCode> errorCode = new ArrayList<>();
                updateChild(childTask, taskDataJson, targetTable);
                boolean retFlag = taskService.simpleUpdate(childTask, errorCode);
                if (!retFlag) {
                    logger.error("TColumnService.updateOutPut simpleUpdate fail!!!!");
                    return null;
                }
            }
        }
        return taskDataJson;
    }

    /**
     * 更新子节点选择的特征列的表名
     */
    private void updateSetParamTable(TaskDTO task, String tableName) {
        JSONObject data = task.view().getData();
        JSONArray setParams = data.getJSONArray("setParams");
        if (CollectionUtil.isEmpty(setParams)) {
            return;
        }
        for (int i = 0; i < setParams.size(); i++) {
            JSONObject param = setParams.getJSONObject(i);
            if (!"feature_cols".equals(param.getString("english_name"))) {
                continue;
            }
            JSONArray values = param.getJSONArray("value");
            if (CollectionUtil.isEmpty(values)) {
                return;
            }
            JSONArray newValue = new JSONArray();
            for (int j = 0; j < values.size(); j++) {
                String selectCol = values.getString(j);
                String[] split = selectCol.split("\\.");
                newValue.add(tableName + "." + split[split.length - 1]);
            }
            param.put("value", newValue);
        }
        task.setDataJson(data.toJSONString());
    }

    /**
     * 查询action列表
     */
    public List<ColumnAction> queryAction(Long taskId) {
        List<TaskInstanceDTO> taskInstanceDTOS = taskInstanceMapper.queryByTaskIdAndOrder(taskId);
        List<ColumnAction> result = Lists.newArrayList();
        if (CollectionUtil.isNotEmpty(taskInstanceDTOS)) {
            taskInstanceDTOS
                .forEach(taskInstanceDTO -> result
                    .add(new ColumnAction(taskInstanceDTO.view())));
        }
        return result;
    }

    public TaskInstanceVO updateAction(TaskInstanceVO vo) {
        JSONObject data = vo.getData();
        Long id = data.getLong("id");
        List<TaskInstanceDTO> actions = taskInstanceMapper.queryByTaskIdAndOrder(vo.getTaskId());
        if (actions.get(0).getId().equals(id)) {
            throw new DataScienceException("原始数据无法修改");
        }
        this.clearActions(actions, vo.getTaskId());
        int index = 0;
        for (int i = 0; i < actions.size(); i++) {
            TaskInstanceDTO dto = actions.get(i);
            JSONObject jsonData = JSONObject.parseObject(dto.getDataJson());
            Long actionId = jsonData.getLong("id");
            if (id.equals(actionId)) {
                index = i;
            }
        }
        TaskInstanceDTO oldAction = actions.remove(index);
        actions.add(index, vo.toTask(vo));
        TaskDTO taskDTO = taskService.queryById(vo.getTaskId());
        String table = getParentOutTable(taskDTO);
        rebuildActions(actions, vo.getTaskId(), table);
        TaskInstanceDTO newAction = actions.get(index);
        actionService
            .addCleanUpUndo(ActionEnum.CLEAN_UPDATE,
                PipelineUtil.parseJSONObject(oldAction, index),
                PipelineUtil.parseJSONObject(newAction, index), taskDTO.getPipelineId());
        return newAction.view();
    }

    public JSONObject queryMinMax(ColumnQueryVO vo) {
        String defaultDataSourceKey = servletContext.getAttribute(Constant.DEFAULT_DATA_SOURCE_KEY).toString();
        String column = SqlUtil.formatPGSqlColName(vo.getName());
        String table = ToolUtil.alignTableName(vo.getTable(), 0L);
        Object min = null;
        Object max = null;
        JSONObject result = new JSONObject();
        Connection conn = null;
        try {
            String sql = String.format("SELECT MAX(%s), MIN(%s) FROM %s", column, column, table);
            Map queryResult = gpDataProvider.executeQuerySQL(defaultDataSourceKey, sql, Map.class);
            List<Object> objects = Arrays.asList(queryResult.values().toArray());
            if (CollectionUtil.isNotEmpty(objects)){
                max = objects.get(0);
                min = objects.get(1);
            }
            if (min instanceof Date) {
                result.put("min", TimeUtil.formatDate(((Date) min).getTime()));
                result.put("max", TimeUtil.formatDate(((Date) max).getTime()));
            } else {
                result.put("min", min);
                result.put("max", max);
            }
        } catch (Exception e) {
            logger.error("TColumnService queryMinMax error,column={}", table + "." + column);
        } finally {
            JDBCUtil.close(conn, null, null);
        }
        return result;
    }

    @Transactional(rollbackFor = Exception.class)
    public TaskRunnerResult execDataClean(TaskInstanceDTO instance, TaskManager taskManager) {
        TaskDTO currentTask = taskService.queryById(instance.getTaskId());
        TaskDTO parentTask = taskService.queryById(Long.valueOf(currentTask.getParentId()));
        //父节点是数据节点，直接成功
        // 临时注释掉
        if (parentTask.getType().equals(TaskTypeEnum.TASK_TYPE_DATA.getVal())) {
            return new TaskRunnerResult(0, null);
        }
        TaskInstanceDTO parentIns = taskInstanceService
            .queryLatestInstanceForTask(Long.valueOf(currentTask.getParentId()));
        JSONObject currentData = JSONObject.parseObject(currentTask.getDataJson());
        String table = "";
        //修改表名为真正的表名
        JSONArray parentOutPut = PipelineUtil.extractOutPut(parentTask, parentIns);
        for (int i = 0; i < parentOutPut.size(); i++) {
            if (!parentOutPut.getJSONObject(i).getString("tableName").contains("model")) {
                table = parentOutPut.getJSONObject(i).getString("tableName");
            }
        }
        //更新节点的输入输出
        currentData.put("input", parentOutPut);
        currentData.put("output", parentOutPut);
        currentTask.setDataJson(
            JSONObject.toJSONString(currentData,
                SerializerFeature.DisableCircularReferenceDetect));
        taskService.update(currentTask);

        List<TaskInstanceDTO> actions = taskInstanceService
            .queryByTaskIdAndOrder(currentTask.getId());
        //先删除action操作
        this.clearActions(actions, currentTask.getId());
        //重新生成actions
        String logInfo = rebuildActions(actions, currentTask.getId(), table);
        //如果有子节点，更新子节点sql
        if (StringUtils.isNotEmpty(currentTask.getChildId())) {
            String[] split = currentTask.getChildId().split(",");
            for (String childTaskId : split) {
                List<TaskFuture> taskFutures = taskManager.getTaskHolder()
                    .get(instance.getSessionId());

                for (TaskFuture taskFuture : taskFutures) {
                    TaskInstanceDTO runInstance = taskFuture.getInstance();
                    if (TaskTypeEnum.TASK_TYPE_CLEAN.getVal().equals(runInstance.getType())) {
                        continue;
                    }

                    if (Long.valueOf(childTaskId).equals(runInstance.getTaskId())) {
                        TaskDTO childTask = taskService.queryById(runInstance.getTaskId());
                        Long timeStamp = taskManager.getTimeHolder()
                            .get(runInstance.getSessionId());
                        //重新生成sql
                        String sql = taskService.initSql(childTask, timeStamp);
                        runInstance.setSqlText(sql);
                        taskInstanceService.update(runInstance);
                    }
                }
            }
        }
        if (StringUtils.isEmpty(logInfo)) {
            return new TaskRunnerResult(0, null);
        } else {
            return new TaskRunnerResult(500, logInfo);
        }
    }

    public String rebuildActions(List<TaskInstanceDTO> actions, Long currentTaskId, String table) {
        return rebuildActions(actions, currentTaskId, table, false);
    }

    public String rebuildActions(List<TaskInstanceDTO> actions, Long currentTaskId, String table, Boolean returnWhenFail) {
        String logInfo = null;
        String outPutTable = table;
        boolean status = true;

        Map<Long, Long> actionMap = Maps.newLinkedHashMap();
        List<TaskInstanceDTO> tempList = Lists.newArrayList();
        if (CollectionUtil.isNotEmpty(actions)) {
            for (TaskInstanceDTO action : actions) {
                JSONObject actionData = JSONObject.parseObject(action.getDataJson());
                actionData.put("table", !table.isEmpty() ? table: actionData.getString("table"));
                TaskInstanceVO view = action.view();
                TaskInstanceDTO newAction = addOneAction(actionData, view);
                if (status && null != newAction) {
                    actionMap.putIfAbsent(view.getData().getLong("id"),
                        JSONObject.parseObject(newAction.getDataJson()).getLong("id"));
                    tempList.add(newAction);
                    //添加action失败，记录第一次失败的信息
                    if (TaskInstanceStatus.FAIL.toString().equals(newAction.getStatus())) {
                        status = false;
                        logInfo = newAction.getLogInfo();
                        if (returnWhenFail){
                            //中途失败直接返回，针对项目复制任务
                            JSONObject dataJson = JSONObject.parseObject(newAction.getDataJson());
                            dataJson.put("logInfo", String.format(Constant.errorTpl, "因功能升级，此节点无法直接复用原有项目的内容，请手动调整"));
                            newAction.setDataJson(dataJson.toJSONString());
                            taskInstanceMapper.update(newAction);
                            break;
                        }
                    } else {
                        //添加action成功，记录输出表
                        outPutTable = actionData.getString("table");
                    }
                }else {
                    tempList.add(action);
                }
                table = actionData.getString("table");
            }
            //清洗节点的输出更新
            updateOutPut(currentTaskId, outPutTable);
            //更新widget对应的action
            List<WidgetDTO> taskWidgets = widgetMapper.queryByTaskIdAndType(currentTaskId);
            updateWidgets(taskWidgets, actionMap);

            actions.clear();
            actions.addAll(tempList);
        }
        return logInfo;
    }

    /**
     * 在清洗历史记录更新后，重新绑定widget的dataId
     *
     * @param taskWidgets 本节点下的widgets
     * @param actionMap   旧actionId到新actionId的映射
     */
    private void updateWidgets(List<WidgetDTO> taskWidgets, Map<Long, Long> actionMap) {
        if (CollectionUtil.isEmpty(taskWidgets)) {
            return;
        }
        for (WidgetDTO widget : taskWidgets) {
            JSONObject widgetData = JSONObject.parseObject(widget.getDataJson());
            JSONObject chartOptions = widgetData.getJSONObject("chartOptions");
            JSONObject formData = widgetData.getJSONObject("formData");
            if (null != formData) {
                if (!"tclean".equals(formData.getString("dataType"))) {
                    continue;
                }
                Long dataId = formData.getLong("dataId");
                Long newDataId = actionMap.get(dataId);
                if (null != newDataId) {
                    chartOptions.put("dataId", newDataId);
                    formData.put("dataId", newDataId);
                    widget.setDataJson(widgetData.toJSONString());
                    widgetMapper.update(widget);
                } else {
                    //action被删掉了，把widget也删掉
                    widgetMapper.deleteById(widget.getId());
                }
            }
        }
    }

    /**
     * 模型参数获取
     *
     * @param taskId
     * @return
     */
    public JSONObject getModelParamsForTask(Long taskId) {
        if (null == taskId) {
            return null;
        }
        TaskInstanceDTO taskInstanceDTO = taskInstanceService.queryLatestInstanceForTask(taskId);
        if (taskInstanceDTO == null) {
            return null;
        }
        String logInfo = taskInstanceDTO.getLogInfo();
        JSONObject logObj;
        try {
            logObj = JSONObject.parseObject(logInfo);
            if (logObj.containsKey("result") && logObj.getJSONObject("result")
                .containsKey("model_params")) {
                JSONObject modelParams = logObj.getJSONObject("result")
                    .getJSONObject("model_params");
                if (modelParams == null || modelParams.size() == 0) {
                    return null;
                }
                return modelParams;
            } else {
                return null;
            }
        } catch (Exception e) {
            return null;
        }
    }

    /**
     * 可能是算子节点，可能没有运行过。先检查数据表是否存在
     */
    public boolean checkTableIfExits(ColumnQueryVO vo) {
        if (!vo.getTable().endsWith("_")) {
            return true;
        }
        if (null == vo.getTaskId()) {
            return false;
        }
        synchronized (cache) {
            String tableVal = cache.getIfPresent(vo.getTaskId());
            if (null != tableVal) {
                if ("".equals(tableVal)) {
                    return false;
                }
                vo.setTable(tableVal);
                return true;
            }
            TaskInstanceDTO taskInstanceDTO = taskInstanceService
                .queryLatestInstanceForTask(vo.getTaskId());
            if (null == taskInstanceDTO) {
                cache.put(vo.getTaskId(), "");
                return false;
            }
            List<String> tables = Lists.newArrayList();
            PipelineUtil
                .extractTableName(Lists.newArrayList(taskInstanceDTO), tables);
            for (String table : tables) {
                if (table.contains(vo.getTable())) {
                    vo.setTable(table);
                    cache.put(vo.getTaskId(), table);
                    return true;
                }
            }
            cache.put(vo.getTaskId(), "");
        }
        return false;
    }

    public void updateChild(TaskDTO childTask, JSONObject currentData, String outPutTable) {
        //更新子节点的data信息
        JSONObject childData = JSONObject.parseObject(childTask.getDataJson());
        childData.put("input", currentData.getJSONArray("output"));
        if (TaskTypeEnum.TASK_TYPE_ALGO.getVal().equals(childTask.getType())) {
            childData.put("source_table", outPutTable);
        }
        childTask.setDataJson(
            JSONObject
                .toJSONString(childData, SerializerFeature.DisableCircularReferenceDetect));
    }

    /**
     * 添加一条原始数据操作记录
     */
    public void updateOriginalDataAction(Long preTaskId, Long currentId) {
        List<TaskInstanceDTO> actions = taskInstanceMapper.queryByTaskIdAndOrder(currentId);
        TaskDTO taskDTO = taskMapper.queryById(currentId);

        TaskInstanceDTO originalAction = actions.get(0);
        JSONObject ActionDataJson = JSONObject.parseObject(originalAction.getDataJson());
        if (ActionDataJson.getString("action").equals(ActionEnum.ORIGINAL_DATA.name())) {

            TaskDTO parentNode = taskMapper.queryById(preTaskId);
            JSONObject parentDataJson = JSONObject.parseObject(parentNode.getDataJson());
            JSONObject parentOutput = parentDataJson.getJSONArray("output").getJSONObject(0);
            JSONObject parentOutput2 = SerializationUtils
                .clone(parentDataJson.getJSONArray("output").getJSONObject(0));
            String parentTable = parentOutput.getString("tableName");
            //update table prop
            ActionDataJson
                .put("table", StringUtil.replaceTableName(parentTable, ActionDataJson.getString("table")));

            JSONObject oldInputInfo = ActionDataJson.getJSONObject("inputInfo");

            //update inputInfo -> input
            oldInputInfo
                .put("input", replaceTableName(parentOutput, oldInputInfo.getJSONArray("input")));

            //update inputInfo -> output
            oldInputInfo
                .put("output", replaceTableName(parentOutput2, oldInputInfo.getJSONArray("output")));
            originalAction.setDataJson(ActionDataJson.toJSONString());

            taskInstanceMapper.update(originalAction);
            //保留原始节点位置  不然会漂移
            JSONObject currPosition = JSONObject.parseObject(taskDTO.getDataJson())
                .getJSONObject("position");
            oldInputInfo.put("position", currPosition);
            taskDTO.setDataJson(oldInputInfo.toJSONString());
            taskMapper.update(taskDTO);

            this.clearActions(actions, currentId);

            String table = getParentOutTable(taskDTO);
            rebuildActions(actions, currentId, table);
        }
    }

    private JSONArray replaceTableName(JSONObject newJson, JSONArray oldJson) {
        JSONObject oldObj = oldJson.getJSONObject(0);
        String newTableName = newJson.getString("tableName");
        String oldTableName = oldObj.getString("tableName");
        String replacedTableName = StringUtil.replaceTableName(newTableName, oldTableName);
        newJson.put("tableName", replacedTableName);
        JSONArray result = new JSONArray();
        result.add(newJson);
        return result;
    }


    /**
     * 添加一条原始数据操作记录
     */
    public TaskInstanceDTO addOriginalDataAction(TaskDTO taskDTO) {
        TaskInstanceVO vo = new TaskInstanceVO();
        vo.setProjectId(taskDTO.getProjectId());
        vo.setTaskId(taskDTO.getId());
        vo.setTaskName(taskDTO.getName());
        vo.setUserId(JwtUtil.getCurrentUserId());
        JSONObject dataJson = JSONObject.parseObject(taskDTO.getDataJson());
        JSONArray input = dataJson.getJSONArray("input");
        String tableName = input.getJSONObject(0).getString("tableName");
        //连线时或添加节点时，检查一下父节点是否运行过。运行过就取父节点的表
        boolean parentNotRun = false;
        if (tableName.endsWith("_")) {
            Long parentId = Long.valueOf(taskDTO.getParentId());
            TaskDTO parentTask = taskService.queryById(parentId);
            JSONObject parentData = JSONObject.parseObject(parentTask.getDataJson());
            String lastStatus = parentData.getString("lastStatus");
            if ("SUCCESS".equals(lastStatus)) {
                tableName = tableName + parentData.getLong("lastTimeStamp");
            } else {
                parentNotRun = true;
            }
        }
        if (!parentNotRun) {
            tableName = ToolUtil.alignTableName(tableName, 0L);
            updateOutPut(taskDTO.getId(), tableName);
        }
        JSONObject data = new JSONObject();
        data.put("action", ActionEnum.ORIGINAL_DATA.toString());
        data.put("table", tableName);
        data.put("description", ActionEnum.ORIGINAL_DATA.label());
        vo.setData(data);
        return addOneAction(data, vo);
    }

    /**
     * 查询单个action
     */
    public ColumnAction queryActionById(Long id) {
        TaskInstanceDTO taskInstanceDTO = taskInstanceMapper.queryById(id);
        if (null == taskInstanceDTO) {
            return null;
        }
        return new ColumnAction(taskInstanceDTO.view());
    }

    /**
     * 查询3个值，推荐分隔符
     *
     * @param vo
     * @return
     */
    public JSONObject recommendSeparator(ColumnQueryVO vo) {
        String defaultDataSourceKey = servletContext.getAttribute(Constant.DEFAULT_DATA_SOURCE_KEY).toString();
        vo.setTable(ToolUtil.alignTableName(vo.getTable(), 0l));
        JSONObject result = new JSONObject();
        Connection conn = null;
        try {
            conn = gpDataProvider.getConn(DEFAULT_DATASET_ID);
            Statement st = conn.createStatement();
            String sql = String.format("select %s from %s limit 3", vo.getName(), vo.getTable());
            ResultSet rs = st.executeQuery(sql);
            List<LinkedHashMap> queryResult = gpDataProvider.executeQuerySQL(defaultDataSourceKey, sql, JSONArray.class).toJavaList(LinkedHashMap.class);
            Set set = Sets.newLinkedHashSet();
            while (rs.next()) {
                String str = rs.getString(1);
                if (StringUtils.isNotEmpty(str)) {
                    if (!str.matches("(.*)([\\W_])(.*)")) {
                        continue;
                    }
                    //分隔符为空格，前端展示不明显，此处不推荐空格分隔符
                    str = str.replaceAll("([a-zA-Z0-9 ]*)([\\W_])(.*)", "$2").trim();
                }
                if (StringUtils.isNotEmpty(str)) {
                    set.add(str);
                }
            }
            JSONArray array = new JSONArray();
            array.addAll(set);
            result.put("separators", array);
        } catch (Exception e) {
            logger.error("TColumnService.recommendSeparator fail, msg={}", e.getMessage());
        }
        finally {
            JDBCUtil.close(conn, null, null);
        }
        return result;
    }

    public List<TaskInstanceDTO> queryByTaskIdAndOrder(Long taskId) {
        return taskInstanceMapper.queryByTaskIdAndOrder(taskId);
    }

    public boolean saveAction(TaskInstanceDTO action) {
        return taskInstanceMapper.save(action);
    }


    public JSONObject recommendValue(String selectValue, String semantic)
        throws BadHanyuPinyinOutputFormatCombination {
        List<String> recommendList = urbanDataService.recommendValue(selectValue, semantic);
        JSONObject result = new JSONObject();
        result.put("values", recommendList);
        return result;
    }

    public JSONObject cleanData(ColumnQueryVO vo) {
        String defaultDataSourceKey = servletContext.getAttribute(Constant.DEFAULT_DATA_SOURCE_KEY).toString();
        String table = vo.getTable();
        JSONObject result = new JSONObject();
        Map<String, Object> logInfo = Maps.newHashMap();

        Connection conn = null;
        try {
            conn = gpDataProvider.getConn(DEFAULT_DATASET_ID);
            Statement st = conn.createStatement();
            String sql = String.format(DELETE_REDUNDANT_DATA_SQL, table, table);
            int count = st.executeUpdate(sql);
//            Integer count = gpDataService.executeUpdateSQL(defaultDataSourceKey, sql).getResult();
            if (count > 0) {
                logInfo.put("count", count);
                logInfo.put("msg", "Clean Data Successfully.");
            } else {
                logInfo.put("count", count);
                logInfo.put("msg", "Clean Data Failed, Or dont need to clean data");
            }
        } catch (Exception e) {
            logger.error("clean redundant data fail, msg={}", e.getMessage());
            result.put("log", logInfo);
            return result;
        }
        finally {
            JDBCUtil.close(conn, null, null);
        }
        result.put("log", logInfo);
        return result;
    }

    public JSONObject queryIndex(JSONObject param) {
        String defaultDataSourceKey = servletContext.getAttribute(Constant.DEFAULT_DATA_SOURCE_KEY).toString();
        String col = param.getString("col");
        String table = param.getString("table");
        if (!table.contains(".")) {
            table = "dataset." + table;
        }
        String type = param.getString("type");
        String sql = "";
        if (type.equals("json")) {
            sql = String
                .format("select distinct json_object_keys(\"%s\"::json) as keys from %s", col,
                    table);
        } else if (type.equals("array")) {
            sql = String.format("select max(json_array_length(\"%s\"::json)) as max from %s", col,
                table);
        }

        JSONArray index = new JSONArray();
        Connection conn = null;
        try {
            conn = gpDataProvider.getConn(DEFAULT_DATASET_ID);
            Statement st = conn.createStatement();
            ResultSet rs = st.executeQuery(sql);
            ResultSetMetaData metaData = rs.getMetaData();
            while (rs.next()) {
                JSONObject jsonObject = new JSONObject();
                for (int i = 1; i < metaData.getColumnCount() + 1; i++) {
                    String name = metaData.getColumnName(i);
                    jsonObject.put(name, rs.getObject(i));
                }
                index.add(jsonObject);
            }
        } catch (Exception e) {
            logger.error("clean redundant data fail, msg={}", e.getMessage());
        } finally {
            JDBCUtil.close(conn, null, null);
        }

        JSONObject result = new JSONObject();
        if (type.equals("array")) {
            int max=((JSONObject) index.get(0)).getInteger("max");
            index = new JSONArray();
            for (int i = 0; i < max; i++) {
                index.add(i);
            }
        }
        result.put("index", index);
        return result;
    }

    public JSONArray queryOrder(JSONObject param) {
//        String defaultDataSourceKey = servletContext.getAttribute(Constant.DEFAULT_DATA_SOURCE_KEY).toString();
        TaskDTO task = taskService.queryById(param.getLong("taskId"));
        String col = param.getString("col");
        JSONArray output = JSONObject.parseObject(task.getDataJson()).getJSONArray("output");
        if (output.size() > 0 && output.getJSONObject(0).getJSONObject("categoryOrder") != null &&
            output.getJSONObject(0).getJSONObject("categoryOrder").getJSONArray(col) != null) {
            return output.getJSONObject(0).getJSONObject("categoryOrder").getJSONArray(col);
        } else {
            String table = param.getString("table");
            if (!table.contains(".")) {
                table = "dataset." + table;
            }
            String sql = String.format("select distinct %s from %s", col, table);
            Set<String> order = gpDataProvider.executeQueryAsOneSet(sql);
//            Set order = gpDataService.executeQuerySQL(defaultDataSourceKey, sql, Set.class).getResult();
            Set<String> result = (Set<String>) order.stream().map(Object::toString).collect(Collectors.toSet());
            return JSONArray.parseArray(JSONObject.toJSONString(result.toArray()));
        }
    }

    public JSONObject formatList(JSONObject param){
        String defaultDataSourceKey = servletContext.getAttribute(Constant.DEFAULT_DATA_SOURCE_KEY).toString();
        JSONObject ret = new JSONObject();
        String col = param.getString("col");
        String type = param.getString("type");
        String semantic = param.getString("semantic");
        String table = param.getString("table");
        if (!table.contains(".")) {
            table = "dataset." + table;
        }

        Connection conn = null;
        try {
            conn = gpDataProvider.getConn(DEFAULT_DATASET_ID);
            Statement st = conn.createStatement();

            String sql = String.format("select \"%s\" from %s where \"%s\" is not null limit 1", col, table, col);
            String sample = gpDataProvider.executeQuery(st, sql).getJSONObject(0).getString(col);
//            String sample = gpDataService.executeQuerySQL(defaultDataSourceKey, sql, String.class).getResult();
            String key = String.format("'%s'", sample);

            if (type.equals(DatasetConstant.DATA_DATE)){
                for (PythonDateTypeFormatEnum item: PythonDateTypeFormatEnum.values()){
                    sql = String.format("select pipeline.sys_func_format_time('%s', '%s') as \"%s\"", sample, item.getVal(), col);
                    String example = gpDataProvider.executeQuery(st, sql).getJSONObject(0).getString(col);
//                    String example = gpDataService.executeQuerySQL(defaultDataSourceKey, sql, String.class).getResult();
                    ret.put(item.getDesc(), example);
                }
                return ret;
            }
            List<String> sqlItems = new ArrayList<>();
            switch (semantic){
                case "country":
                    sqlItems = SemanticUtil.semanticItems(key, SemanticConstant.COUNTRY_COL, "", "");
                    sql = String.format("select %s from dataset._country_mapper_ where %s",
                        Joiner.on(",").join(SemanticConstant.COUNTRY_COL), Joiner.on(" or ").join(sqlItems));
                    break;
                case "province":
                    sqlItems = SemanticUtil.semanticItems(key, SemanticConstant.PROVINCE_COL, "", "");
                    sql = String.format("select %s from dataset._province_mapper_ where %s",
                        Joiner.on(",").join(SemanticConstant.PROVINCE_COL), Joiner.on(" or ").join(sqlItems));
                    break;
                case "city":
                    sqlItems = SemanticUtil.semanticItems(key, SemanticConstant.CITY_COL, "", "");
                    sql = String.format("select %s from dataset._city_mapper_ where %s",
                        Joiner.on(",").join(SemanticConstant.CITY_COL), Joiner.on(" or ").join(sqlItems));
                    break;
            }
            JSONArray data =  gpDataProvider.executeQuery(st, sql);
//            JSONArray data = gpDataService.executeQuerySQL(defaultDataSourceKey, sql, JSONArray.class).getResult();
            JSONObject jsonObject = data.getJSONObject(0);
            for (Map.Entry entry : jsonObject.entrySet()) {
                if (entry.getValue() != null && entry.getValue() != sample) {
                    ret.put((String) entry.getKey(), entry.getValue());
                }
            }
        } catch (Exception e) {
            logger.error("query formatList failed, since = {}", e.getMessage());
        }
        finally {
            JDBCUtil.close(conn, null, null);
        }
        return ret;
    }


    public JSONObject multiColumnSearch(MultiColumnQueryVO vo) {
        TaskDTO taskDTO = taskMapper.queryById(vo.getTaskId());
        if (taskDTO.getType().equals(TaskTypeEnum.TASK_TYPE_MODEL.getVal())) {
            try {
                String dataJson = taskDTO.getDataJson();
                JSONObject data = JSONObject.parseObject(dataJson);
                JSONArray outputarray = data.getJSONArray("output");
                JSONObject output = outputarray.getJSONObject(0);
                String otableName = output.getString("tableName");
                if (!otableName.contains(".")) {
                    otableName = "dataset." + otableName;
                }
                vo.setTable(otableName);
            } catch (Exception e) {
                logger.info("unable to set table name");
            }
        } else {
            vo.setTable(ToolUtil.alignTableName(vo.getTable(), 0L));
        }

        String searchStr = generateSearch(vo.getColumns());
        Page<JSONArray> page = new Page<>();
        int curPage = vo.getCurPage();
        int pageSize = vo.getPageSize();

        String tableName = vo.getTable();

        int offset = (vo.getCurPage() - 1) * vo.getPageSize();

        page.setPageSize(pageSize);
        page.setCurPage(curPage);

        JSONObject result = new JSONObject();
        Connection conn = null;
        Integer recordsAmount = 0;
        try {
            //查询数据的总条数
            conn = gpDataProvider.getConn(DEFAULT_DATASET_ID);

            String sql = String.format(QUERY_TOTAL_SQL, vo.getTable(), searchStr);
            PreparedStatement pst = conn.prepareStatement(sql);
            for (int i=0;i<vo.getColumns().size();i++) {
                pst.setString(i+1,"%"+vo.getSearch()+"%");
            }
            sql = sql.replaceAll("\\?", "'%" + vo.getSearch() + "%'");
            ResultSet rs = pst.executeQuery();
            if (rs.next()) {
                recordsAmount = rs.getInt(1);
                page.setTotalElementsAndPage(recordsAmount);
            }

            // TSNE flag (TSNE算法 缺少 _record_id_列，需要增加)
            Map<String, Integer> currDataColMap = gpDataProvider
                .getColumnTypesOriginal(new Table(DEFAULT_DATASET_ID, vo.getTable()));

            boolean record_id_flag = false;
            if (currDataColMap.containsKey(DatasetConstant.DEFAULT_ID_FIELD)) {
                record_id_flag = true;
                sql = String.format("SELECT * FROM %s %s order by _record_id_ LIMIT %s OFFSET %s",
                    vo.getTable(), searchStr, pageSize, offset);
            } else {
                sql = String.format("SELECT * FROM %s %s LIMIT %s OFFSET %s",
                    vo.getTable(), searchStr, pageSize, offset);
            }

            pst.clearParameters();
            pst = conn.prepareStatement(sql);

            for (int i=0;i<vo.getColumns().size();i++) {
                pst.setString(i+1,"%"+vo.getSearch()+"%");
            }

            //真实数据查询
            rs = pst.executeQuery();
            ResultSetMetaData meta = rs.getMetaData();
            int colCount = meta.getColumnCount();

            //生成结果的head结构
            List<String> colNames = new ArrayList<>();

            //若为清洗节点从taskInstance取元数据，否则从task取元数据，若出错则从数据库取元数据
            try {
                int taskType = taskDTO.getType();
                JSONObject info = new JSONObject();
                if (taskType == TaskTypeEnum.TASK_TYPE_CLEAN.getVal()) {
                    TaskInstanceDTO taskInstance = taskInstanceService.queryInstanceByTableName(tableName, vo.getTaskId());
                    info = JSONObject.parseObject(taskInstance.getDataJson()).getJSONObject("inputInfo").getJSONArray("output")
                            .getJSONObject(0);
                } else {
                    info = JSONObject.parseObject(taskDTO.getDataJson()).getJSONArray("output")
                            .getJSONObject(0);
                }
                JSONArray tableCols = info.getJSONArray("tableCols");
                JSONArray columnTypes = info.getJSONArray("columnTypes");

                JSONObject types = new JSONObject();
                for (int i = 0; i < tableCols.size(); i++) {
                    types.put(tableCols.getString(i), columnTypes.getString(i));
                }
                for (int i = 1; i < colCount + 1; i++) {
                    if (DatasetConstant.DEFAULT_ID_FIELD.equals(meta.getColumnName(i))) {
                        colNames.add(meta.getColumnName(i));
                        continue;
                    }
                    if (meta.getColumnName(i).contains(ColumnConstant.ROW_NUM)) {
                        continue;
                    }
                    String columnName = meta.getColumnName(i);
                    if (types.containsKey(columnName)) {
                        colNames.add(meta.getColumnName(i));
                    }
                }
            } catch (Exception e) {
                for (int i = 1; i < colCount + 1; i++) {
                    if (DatasetConstant.DEFAULT_ID_FIELD.equals(meta.getColumnName(i))) {
                        colNames.add(meta.getColumnName(i));
                        continue;
                    }
                    if (meta.getColumnName(i).contains(ColumnConstant.ROW_NUM)) {
                        continue;
                    }
                    colNames.add(meta.getColumnName(i));
                }
            }

            JSONArray ja = new JSONArray();
            /* 生成data结构 */
            long fake_id = 1L;
            while (rs.next()) {
                JSONObject column = new JSONObject();
                for (String colName : colNames) {
                    column.put(colName, rs.getString(colName));
                }
                // TSNE flag
                if (!record_id_flag) {
                    column.put("_record_id_", fake_id++);
                }
                // TSNE flag
                ja.add(column);
            }

            result.put("data", ja);
            result.put("page", page);
        } catch (Exception e) {
            e.printStackTrace();
            logger.error("TColumnService multiColumnSearch error,column={}, errMsg={}",
                vo.getColumns(), e.getMessage());
        }
        finally {
            JDBCUtil.close(conn, null, null);
        }

        return result;
    }

    public String generateSearch(Set<String> columns) {
        if (columns==null||columns.isEmpty()) {
            return "";
        }
        StringBuilder sb = new StringBuilder();
        int i = 0;
        for (String column:columns) {
            if (i==0) {
                sb.append(" where  cast(\"").append(column).append("\" as varchar ) like ? ");
            } else {
                sb.append("or cast(\"").append(column).append("\" as varchar ) like ? ");
            }
            i++;
        }
        return sb.toString();
    }

    /**
     * 获取指定百分比位置的数据
     * @param vo
     * @return
     */
    public JSONObject queryPercentageData(PercentageDataVO vo) {
        String defaultDataSourceKey = servletContext.getAttribute(Constant.DEFAULT_DATA_SOURCE_KEY).toString();
//        Connection conn = null;
//        PreparedStatement pst = null;

        try {
//            conn = gpDataProvider.getConn(DEFAULT_DATASET_ID);
            String table = vo.getTable();
            if (!table.contains(".")) {
                table = "dataset."+table;
            }
            String col = SqlUtil.formatPGSqlColName(vo.getCol());
            Double percentage = vo.getPercentage();
            String sql = String.format(QUERY_TOTAL_SQL,table," where "+col +" is not null");

//            pst = conn.prepareStatement(sql);
            Integer count = gpDataProvider.executeQuerySQL(defaultDataSourceKey, sql, Integer.class);
//            ResultSet rs = pst.executeQuery();
//            int count = 0;
//            while (rs.next()) {
//                count = rs.getInt(1);
//            }
            if (count==0) {
                return null;
            }
            long offset = Math.round(count*percentage/100d);
            offset = offset<1?0:offset-1;
            col = SqlUtil.formatPGSqlColName(col);

            sql = "select "+col+" from "+table+" where "+col +" is not null order by "+col+" limit 1 offset " + offset;

//            pst.clearParameters();

//            pst = conn.prepareStatement(sql);
//            pst.setLong(1,offset);
//            rs = pst.executeQuery();
//            Object res = null;
//            while (rs.next()) {
//                res = rs.getObject(1);
//            }
            Map<String, Object> queryResult = gpDataProvider.executeQuerySQL(defaultDataSourceKey, sql, Map.class);
            JSONObject jo = new JSONObject();
            jo.put("percentage",percentage);
            jo.put("value",queryResult.values().toArray()[0]);
            return jo;
        } catch (Exception e) {
            logger.error(e.getMessage(),e);
            throw new DataScienceException("获取百分比位置数据异常");
        }
//        finally {
//            JDBCUtil.close(conn, null, null);
//        }
    }

    public int queryMaxSplitNum(JSONObject data) {
        String sql = DataCleanSqlHelper.initQueryMaxSplitSql(data);
        int maxSplit = queryMaxSplit(sql);
        return maxSplit;
    }
}
